or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-context.mdhost-communication.mdindex.mdmessage-utilities.mdrunner-execution.mdstream-processing.md
tile.json

tessl/npm-scramjet--runner

Runtime environment for sequence execution and communication with Transform Hub host.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@scramjet/runner@1.0.x

To install, run

npx @tessl/cli install tessl/npm-scramjet--runner@1.0.0

index.mddocs/

@scramjet/runner

@scramjet/runner is a runtime environment for sequence execution that provides communication mechanisms with the Scramjet Transform Hub host. It enables remote execution of data transformation sequences with comprehensive control, monitoring, and stream management capabilities.

Package Information

  • Package Name: @scramjet/runner
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install @scramjet/runner

Core Imports

import { Runner, MessageUtils } from "@scramjet/runner";

For CommonJS:

const { Runner, MessageUtils } = require("@scramjet/runner");

Basic Usage

import { Runner, MessageUtils } from "@scramjet/runner";
import { RunnerMessageCode } from "@scramjet/symbols";

// This package is primarily designed for use by the Scramjet Transform Hub
// infrastructure. The Runner class manages sequence execution internally.

// MessageUtils can be used for writing monitoring messages to streams
const monitoringMessage = [RunnerMessageCode.MONITORING, { healthy: true }];
MessageUtils.writeMessageOnStream(monitoringMessage, monitorStream);

// In sequence functions, the context is available as 'this'
export default function(this: RunnerAppContext<any, any>, input: DataStream) {
  // Use context methods for lifecycle management
  this.addStopHandler(async (timeout, canKeepAlive) => {
    console.log("Sequence stopping gracefully");
  });
  
  // Send keep-alive if needed
  this.keepAlive(5000);
  
  return input.map(item => ({ ...item, processed: true }));
}

Architecture

@scramjet/runner is built around several key components:

  • Runner Class: Core execution engine that manages sequence lifecycle, stream handling, and host communication
  • HostClient: Network communication layer managing multiple streams to/from the Transform Hub host
  • RunnerAppContext: Application context providing APIs for sequences including event handling, monitoring, and lifecycle management
  • MessageUtils: Utility functions for encoding and writing monitoring messages to streams
  • Stream Management: Comprehensive handling of input/output streams with content type detection and serialization
  • Communication Protocol: JSON-based messaging system with multiple communication channels (control, monitoring, data streams)

Capabilities

Runner Execution

Core sequence execution environment that manages the complete lifecycle of data transformation sequences.

class Runner<X extends AppConfig> implements IComponent {
  constructor(
    sequencePath: string,
    hostClient: IHostClient,
    instanceId: string,
    sequenceInfo: SequenceInfo,
    runnerConnectInfo: RunnerConnectInfo
  );
  
  main(): Promise<void>;
  logger: IObjectLogger;
  context: RunnerAppContext<X, any>;
}

Runner Execution

Host Communication

Internal network communication layer managing connections and data streams between runner and Transform Hub host. This is used internally by the Runner class and not exposed as part of the public API.

Host Communication

Application Context

Runtime context for sequences providing lifecycle management, event handling, and host integration APIs. This context is accessible within sequence functions as this and through the Runner's context property.

// Available through runner.context or as 'this' in sequence functions
interface RunnerAppContext<AppConfigType extends AppConfig, State extends any> {
  config: AppConfigType;
  logger: IObjectLogger;
  hub: HostClient;
  space: ManagerClient;
  instanceId: string;
  
  addKillHandler(handler: KillHandler): this;
  addStopHandler(handler: StopHandler): this;
  addMonitoringHandler(handler: MonitoringHandler): this;
  keepAlive(milliseconds?: number): this;
  end(): this;
  destroy(error?: AppError): this;
  on(eventName: string, handler: (message?: any) => void): this;
  emit(eventName: string, message?: any): this;
}

Application Context

Message Utilities

Utility functions for encoding and transmitting monitoring messages through streams.

class MessageUtils {
  static writeMessageOnStream(
    message: EncodedMonitoringMessage,
    streamToWrite: WritableStream<any>
  ): void;
}

Message Utilities

Stream Processing

Internal stream processing utilities for handling different content types and stream formats. These are used internally by the Runner and not exposed as part of the public API.

Stream Processing

Core Types

interface AppConfig {
  [key: string]: any;
}

interface SequenceInfo {
  sequenceId: string;
  config: Record<string, any>;
}

interface RunnerConnectInfo {
  appConfig: AppConfig;
  args?: any[];
}

interface RunnerProxy {
  sendKeepAlive(data: KeepAliveMessageData): void;
  sendStop(error?: AppError | Error): void;
  sendEvent(ev: EventMessageData): void;
  keepAliveIssued(): void;
}

interface IComponent {
  logger: IObjectLogger;
}

type EncodedMonitoringMessage = [RunnerMessageCode, any];
type EncodedControlMessage = [RunnerMessageCode, any];

type KillHandler = () => void;
type StopHandler = (timeout: number, canCallKeepalive: boolean) => Promise<void>;
type MonitoringHandler = (message: MonitoringMessageFromRunnerData) => Promise<MonitoringMessageFromRunnerData>;

interface MonitoringMessageFromRunnerData {
  healthy: boolean;
}

interface KeepAliveMessageData {
  keepAlive: number;
}

interface EventMessageData {
  eventName: string;
  message?: any;
}

interface PangMessageData {
  requires?: string;
  provides?: string;
  contentType?: string;
  outputEncoding?: string;
}

interface FunctionDefinition {
  mode: string;
  name: string;
  [key: string]: any;
}

interface IHostClient {
  logger: IObjectLogger;
  
  init(id: string): Promise<void>;
  disconnect(hard: boolean): Promise<void>;
  getAgent(): Agent;
  
  readonly stdinStream: Readable;
  readonly stdoutStream: Writable;
  readonly stderrStream: Writable;
  readonly controlStream: Readable;
  readonly monitorStream: Writable;
  readonly inputStream: Readable;
  readonly outputStream: Writable;
  readonly logStream: Writable;
  readonly packageStream: Readable;
}

interface WritableStream<T> {
  write(chunk: T): boolean;
}

interface ApplicationInterface {
  requires?: string;
  contentType?: string;
  (...args: any[]): any;
}

interface HasTopicInformation {
  topic?: string;
  contentType?: string;
}

enum InstanceStatus {
  STARTING = "starting",
  RUNNING = "running",
  STOPPING = "stopping",
  COMPLETED = "completed",
  ERRORED = "errored",
  KILLING = "killing"
}

enum RunnerExitCode {
  STOPPED = 0,
  KILLED = 1,
  SEQUENCE_FAILED_ON_START = 2,
  SEQUENCE_FAILED_DURING_EXECUTION = 3,
  UNCAUGHT_EXCEPTION = 4,
  CLEANUP_FAILED = 5,
  INVALID_ENV_VARS = 6,
  INVALID_SEQUENCE_PATH = 7
}

type MaybePromise<T> = T | Promise<T>;