or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-context.mdhost-communication.mdindex.mdmessage-utilities.mdrunner-execution.mdstream-processing.md

message-utilities.mddocs/

0

# Message Utilities

1

2

The MessageUtils class provides utility functions for encoding and transmitting monitoring messages through streams in the Scramjet Transform Hub communication protocol.

3

4

## Capabilities

5

6

### MessageUtils Class

7

8

Static utility class for handling message encoding and stream writing in the Transform Hub communication protocol.

9

10

```typescript { .api }

11

/**

12

* Utility class for stream messaging operations

13

*/

14

class MessageUtils {

15

/**

16

* Write encoded monitoring message to a stream with proper formatting

17

* @param message - Encoded monitoring message as [code, data] tuple

18

* @param streamToWrite - Writable stream to send the message to

19

* @throws Error if streamToWrite is undefined

20

*/

21

static writeMessageOnStream(

22

message: EncodedMonitoringMessage,

23

streamToWrite: WritableStream<any>

24

): void;

25

}

26

```

27

28

**Usage Example:**

29

30

```typescript

31

import { MessageUtils } from "@scramjet/runner";

32

import { RunnerMessageCode } from "@scramjet/symbols";

33

34

// Send health monitoring message

35

const healthMessage: EncodedMonitoringMessage = [

36

RunnerMessageCode.MONITORING,

37

{ healthy: true, timestamp: Date.now() }

38

];

39

40

MessageUtils.writeMessageOnStream(healthMessage, hostClient.monitorStream);

41

42

// Send keep-alive message

43

const keepAliveMessage: EncodedMonitoringMessage = [

44

RunnerMessageCode.ALIVE,

45

{ keepAlive: 5000 }

46

];

47

48

MessageUtils.writeMessageOnStream(keepAliveMessage, hostClient.monitorStream);

49

50

// Send sequence completion message

51

const completionMessage: EncodedMonitoringMessage = [

52

RunnerMessageCode.SEQUENCE_COMPLETED,

53

{ timeout: 10000 }

54

];

55

56

MessageUtils.writeMessageOnStream(completionMessage, hostClient.monitorStream);

57

```

58

59

## Message Format

60

61

The Transform Hub communication protocol uses JSON-encoded messages with CRLF line termination:

62

63

- **Format**: `JSON.stringify([code, data]) + "\r\n"`

64

- **Structure**: Messages are tuples of `[MessageCode, MessageData]`

65

- **Encoding**: UTF-8 text encoding

66

- **Termination**: Messages end with carriage return + line feed (`\r\n`)

67

68

## Common Message Types

69

70

```typescript { .api }

71

enum RunnerMessageCode {

72

PING = 0,

73

PONG = 1,

74

KILL = 6,

75

STOP = 7,

76

MONITORING = 8,

77

MONITORING_RATE = 9,

78

MONITORING_REPLY = 10,

79

ALIVE = 11,

80

EVENT = 12,

81

SEQUENCE_STOPPED = 13,

82

SEQUENCE_COMPLETED = 14,

83

PANG = 15,

84

ACKNOWLEDGE = 16

85

}

86

87

// Health monitoring message

88

type HealthMessage = [RunnerMessageCode.MONITORING, { healthy: boolean }];

89

90

// Keep-alive message

91

type KeepAliveMessage = [RunnerMessageCode.ALIVE, { keepAlive: number }];

92

93

// Event message

94

type EventMessage = [RunnerMessageCode.EVENT, { eventName: string; message?: any }];

95

96

// Sequence completion message

97

type CompletionMessage = [RunnerMessageCode.SEQUENCE_COMPLETED, { timeout: number }];

98

99

// PANG message (input/output requirements)

100

type PangMessage = [RunnerMessageCode.PANG, {

101

requires?: string;

102

provides?: string;

103

contentType?: string;

104

outputEncoding?: string;

105

}];

106

```

107

108

**Advanced Usage Example:**

109

110

```typescript

111

import { MessageUtils } from "@scramjet/runner";

112

import { RunnerMessageCode } from "@scramjet/symbols";

113

114

class SequenceMonitor {

115

constructor(private monitorStream: WritableStream<any>) {}

116

117

reportHealth(healthy: boolean, details?: any) {

118

const message: EncodedMonitoringMessage = [

119

RunnerMessageCode.MONITORING,

120

{ healthy, ...details }

121

];

122

MessageUtils.writeMessageOnStream(message, this.monitorStream);

123

}

124

125

sendKeepAlive(duration: number) {

126

const message: EncodedMonitoringMessage = [

127

RunnerMessageCode.ALIVE,

128

{ keepAlive: duration }

129

];

130

MessageUtils.writeMessageOnStream(message, this.monitorStream);

131

}

132

133

emitEvent(eventName: string, data?: any) {

134

const message: EncodedMonitoringMessage = [

135

RunnerMessageCode.EVENT,

136

{ eventName, message: data }

137

];

138

MessageUtils.writeMessageOnStream(message, this.monitorStream);

139

}

140

141

reportCompletion(timeout: number = 10000) {

142

const message: EncodedMonitoringMessage = [

143

RunnerMessageCode.SEQUENCE_COMPLETED,

144

{ timeout }

145

];

146

MessageUtils.writeMessageOnStream(message, this.monitorStream);

147

}

148

149

sendPang(requirements: {

150

requires?: string;

151

provides?: string;

152

contentType?: string;

153

outputEncoding?: string;

154

}) {

155

const message: EncodedMonitoringMessage = [

156

RunnerMessageCode.PANG,

157

requirements

158

];

159

MessageUtils.writeMessageOnStream(message, this.monitorStream);

160

}

161

}

162

163

// Usage

164

const monitor = new SequenceMonitor(hostClient.monitorStream);

165

monitor.reportHealth(true, { memoryUsage: process.memoryUsage() });

166

monitor.sendKeepAlive(30000);

167

monitor.emitEvent("data-processed", { count: 100 });

168

```

169

170

## Error Handling

171

172

The `writeMessageOnStream` method validates the stream parameter and throws an error if it's undefined:

173

174

```typescript

175

// This will throw: "The Stream is not defined."

176

try {

177

MessageUtils.writeMessageOnStream(

178

[RunnerMessageCode.MONITORING, { healthy: true }],

179

undefined as any

180

);

181

} catch (error) {

182

console.error(error.message); // "The Stream is not defined."

183

}

184

```

185

186

## Supporting Types

187

188

```typescript { .api }

189

type EncodedMonitoringMessage = [RunnerMessageCode, any];

190

191

interface WritableStream<T> {

192

write(chunk: T): boolean;

193

}

194

195

interface RunnerMessageCode {

196

PING: 0;

197

PONG: 1;

198

KILL: 6;

199

STOP: 7;

200

MONITORING: 8;

201

MONITORING_RATE: 9;

202

MONITORING_REPLY: 10;

203

ALIVE: 11;

204

EVENT: 12;

205

SEQUENCE_STOPPED: 13;

206

SEQUENCE_COMPLETED: 14;

207

PANG: 15;

208

ACKNOWLEDGE: 16;

209

}

210

```