Runtime environment for sequence execution and communication with Transform Hub host.
npx @tessl/cli install tessl/npm-scramjet--runner@1.0.0@scramjet/runner is a runtime environment for sequence execution that provides communication mechanisms with the Scramjet Transform Hub host. It enables remote execution of data transformation sequences with comprehensive control, monitoring, and stream management capabilities.
npm install @scramjet/runnerimport { Runner, MessageUtils } from "@scramjet/runner";For CommonJS:
const { Runner, MessageUtils } = require("@scramjet/runner");import { Runner, MessageUtils } from "@scramjet/runner";
import { RunnerMessageCode } from "@scramjet/symbols";
// This package is primarily designed for use by the Scramjet Transform Hub
// infrastructure. The Runner class manages sequence execution internally.
// MessageUtils can be used for writing monitoring messages to streams
const monitoringMessage = [RunnerMessageCode.MONITORING, { healthy: true }];
MessageUtils.writeMessageOnStream(monitoringMessage, monitorStream);
// In sequence functions, the context is available as 'this'
export default function(this: RunnerAppContext<any, any>, input: DataStream) {
// Use context methods for lifecycle management
this.addStopHandler(async (timeout, canKeepAlive) => {
console.log("Sequence stopping gracefully");
});
// Send keep-alive if needed
this.keepAlive(5000);
return input.map(item => ({ ...item, processed: true }));
}@scramjet/runner is built around several key components:
Core sequence execution environment that manages the complete lifecycle of data transformation sequences.
class Runner<X extends AppConfig> implements IComponent {
constructor(
sequencePath: string,
hostClient: IHostClient,
instanceId: string,
sequenceInfo: SequenceInfo,
runnerConnectInfo: RunnerConnectInfo
);
main(): Promise<void>;
logger: IObjectLogger;
context: RunnerAppContext<X, any>;
}Internal network communication layer managing connections and data streams between runner and Transform Hub host. This is used internally by the Runner class and not exposed as part of the public API.
Runtime context for sequences providing lifecycle management, event handling, and host integration APIs. This context is accessible within sequence functions as this and through the Runner's context property.
// Available through runner.context or as 'this' in sequence functions
interface RunnerAppContext<AppConfigType extends AppConfig, State extends any> {
config: AppConfigType;
logger: IObjectLogger;
hub: HostClient;
space: ManagerClient;
instanceId: string;
addKillHandler(handler: KillHandler): this;
addStopHandler(handler: StopHandler): this;
addMonitoringHandler(handler: MonitoringHandler): this;
keepAlive(milliseconds?: number): this;
end(): this;
destroy(error?: AppError): this;
on(eventName: string, handler: (message?: any) => void): this;
emit(eventName: string, message?: any): this;
}Utility functions for encoding and transmitting monitoring messages through streams.
class MessageUtils {
static writeMessageOnStream(
message: EncodedMonitoringMessage,
streamToWrite: WritableStream<any>
): void;
}Internal stream processing utilities for handling different content types and stream formats. These are used internally by the Runner and not exposed as part of the public API.
interface AppConfig {
[key: string]: any;
}
interface SequenceInfo {
sequenceId: string;
config: Record<string, any>;
}
interface RunnerConnectInfo {
appConfig: AppConfig;
args?: any[];
}
interface RunnerProxy {
sendKeepAlive(data: KeepAliveMessageData): void;
sendStop(error?: AppError | Error): void;
sendEvent(ev: EventMessageData): void;
keepAliveIssued(): void;
}
interface IComponent {
logger: IObjectLogger;
}
type EncodedMonitoringMessage = [RunnerMessageCode, any];
type EncodedControlMessage = [RunnerMessageCode, any];
type KillHandler = () => void;
type StopHandler = (timeout: number, canCallKeepalive: boolean) => Promise<void>;
type MonitoringHandler = (message: MonitoringMessageFromRunnerData) => Promise<MonitoringMessageFromRunnerData>;
interface MonitoringMessageFromRunnerData {
healthy: boolean;
}
interface KeepAliveMessageData {
keepAlive: number;
}
interface EventMessageData {
eventName: string;
message?: any;
}
interface PangMessageData {
requires?: string;
provides?: string;
contentType?: string;
outputEncoding?: string;
}
interface FunctionDefinition {
mode: string;
name: string;
[key: string]: any;
}
interface IHostClient {
logger: IObjectLogger;
init(id: string): Promise<void>;
disconnect(hard: boolean): Promise<void>;
getAgent(): Agent;
readonly stdinStream: Readable;
readonly stdoutStream: Writable;
readonly stderrStream: Writable;
readonly controlStream: Readable;
readonly monitorStream: Writable;
readonly inputStream: Readable;
readonly outputStream: Writable;
readonly logStream: Writable;
readonly packageStream: Readable;
}
interface WritableStream<T> {
write(chunk: T): boolean;
}
interface ApplicationInterface {
requires?: string;
contentType?: string;
(...args: any[]): any;
}
interface HasTopicInformation {
topic?: string;
contentType?: string;
}
enum InstanceStatus {
STARTING = "starting",
RUNNING = "running",
STOPPING = "stopping",
COMPLETED = "completed",
ERRORED = "errored",
KILLING = "killing"
}
enum RunnerExitCode {
STOPPED = 0,
KILLED = 1,
SEQUENCE_FAILED_ON_START = 2,
SEQUENCE_FAILED_DURING_EXECUTION = 3,
UNCAUGHT_EXCEPTION = 4,
CLEANUP_FAILED = 5,
INVALID_ENV_VARS = 6,
INVALID_SEQUENCE_PATH = 7
}
type MaybePromise<T> = T | Promise<T>;