0
# Message Utilities
1
2
The MessageUtils class provides utility functions for encoding and transmitting monitoring messages through streams in the Scramjet Transform Hub communication protocol.
3
4
## Capabilities
5
6
### MessageUtils Class
7
8
Static utility class for handling message encoding and stream writing in the Transform Hub communication protocol.
9
10
```typescript { .api }
11
/**
12
* Utility class for stream messaging operations
13
*/
14
class MessageUtils {
15
/**
16
* Write encoded monitoring message to a stream with proper formatting
17
* @param message - Encoded monitoring message as [code, data] tuple
18
* @param streamToWrite - Writable stream to send the message to
19
* @throws Error if streamToWrite is undefined
20
*/
21
static writeMessageOnStream(
22
message: EncodedMonitoringMessage,
23
streamToWrite: WritableStream<any>
24
): void;
25
}
26
```
27
28
**Usage Example:**
29
30
```typescript
31
import { MessageUtils } from "@scramjet/runner";
32
import { RunnerMessageCode } from "@scramjet/symbols";
33
34
// Send health monitoring message
35
const healthMessage: EncodedMonitoringMessage = [
36
RunnerMessageCode.MONITORING,
37
{ healthy: true, timestamp: Date.now() }
38
];
39
40
MessageUtils.writeMessageOnStream(healthMessage, hostClient.monitorStream);
41
42
// Send keep-alive message
43
const keepAliveMessage: EncodedMonitoringMessage = [
44
RunnerMessageCode.ALIVE,
45
{ keepAlive: 5000 }
46
];
47
48
MessageUtils.writeMessageOnStream(keepAliveMessage, hostClient.monitorStream);
49
50
// Send sequence completion message
51
const completionMessage: EncodedMonitoringMessage = [
52
RunnerMessageCode.SEQUENCE_COMPLETED,
53
{ timeout: 10000 }
54
];
55
56
MessageUtils.writeMessageOnStream(completionMessage, hostClient.monitorStream);
57
```
58
59
## Message Format
60
61
The Transform Hub communication protocol uses JSON-encoded messages with CRLF line termination:
62
63
- **Format**: `JSON.stringify([code, data]) + "\r\n"`
64
- **Structure**: Messages are tuples of `[MessageCode, MessageData]`
65
- **Encoding**: UTF-8 text encoding
66
- **Termination**: Messages end with carriage return + line feed (`\r\n`)
67
68
## Common Message Types
69
70
```typescript { .api }
71
enum RunnerMessageCode {
72
PING = 0,
73
PONG = 1,
74
KILL = 6,
75
STOP = 7,
76
MONITORING = 8,
77
MONITORING_RATE = 9,
78
MONITORING_REPLY = 10,
79
ALIVE = 11,
80
EVENT = 12,
81
SEQUENCE_STOPPED = 13,
82
SEQUENCE_COMPLETED = 14,
83
PANG = 15,
84
ACKNOWLEDGE = 16
85
}
86
87
// Health monitoring message
88
type HealthMessage = [RunnerMessageCode.MONITORING, { healthy: boolean }];
89
90
// Keep-alive message
91
type KeepAliveMessage = [RunnerMessageCode.ALIVE, { keepAlive: number }];
92
93
// Event message
94
type EventMessage = [RunnerMessageCode.EVENT, { eventName: string; message?: any }];
95
96
// Sequence completion message
97
type CompletionMessage = [RunnerMessageCode.SEQUENCE_COMPLETED, { timeout: number }];
98
99
// PANG message (input/output requirements)
100
type PangMessage = [RunnerMessageCode.PANG, {
101
requires?: string;
102
provides?: string;
103
contentType?: string;
104
outputEncoding?: string;
105
}];
106
```
107
108
**Advanced Usage Example:**
109
110
```typescript
111
import { MessageUtils } from "@scramjet/runner";
112
import { RunnerMessageCode } from "@scramjet/symbols";
113
114
class SequenceMonitor {
115
constructor(private monitorStream: WritableStream<any>) {}
116
117
reportHealth(healthy: boolean, details?: any) {
118
const message: EncodedMonitoringMessage = [
119
RunnerMessageCode.MONITORING,
120
{ healthy, ...details }
121
];
122
MessageUtils.writeMessageOnStream(message, this.monitorStream);
123
}
124
125
sendKeepAlive(duration: number) {
126
const message: EncodedMonitoringMessage = [
127
RunnerMessageCode.ALIVE,
128
{ keepAlive: duration }
129
];
130
MessageUtils.writeMessageOnStream(message, this.monitorStream);
131
}
132
133
emitEvent(eventName: string, data?: any) {
134
const message: EncodedMonitoringMessage = [
135
RunnerMessageCode.EVENT,
136
{ eventName, message: data }
137
];
138
MessageUtils.writeMessageOnStream(message, this.monitorStream);
139
}
140
141
reportCompletion(timeout: number = 10000) {
142
const message: EncodedMonitoringMessage = [
143
RunnerMessageCode.SEQUENCE_COMPLETED,
144
{ timeout }
145
];
146
MessageUtils.writeMessageOnStream(message, this.monitorStream);
147
}
148
149
sendPang(requirements: {
150
requires?: string;
151
provides?: string;
152
contentType?: string;
153
outputEncoding?: string;
154
}) {
155
const message: EncodedMonitoringMessage = [
156
RunnerMessageCode.PANG,
157
requirements
158
];
159
MessageUtils.writeMessageOnStream(message, this.monitorStream);
160
}
161
}
162
163
// Usage
164
const monitor = new SequenceMonitor(hostClient.monitorStream);
165
monitor.reportHealth(true, { memoryUsage: process.memoryUsage() });
166
monitor.sendKeepAlive(30000);
167
monitor.emitEvent("data-processed", { count: 100 });
168
```
169
170
## Error Handling
171
172
The `writeMessageOnStream` method validates the stream parameter and throws an error if it's undefined:
173
174
```typescript
175
// This will throw: "The Stream is not defined."
176
try {
177
MessageUtils.writeMessageOnStream(
178
[RunnerMessageCode.MONITORING, { healthy: true }],
179
undefined as any
180
);
181
} catch (error) {
182
console.error(error.message); // "The Stream is not defined."
183
}
184
```
185
186
## Supporting Types
187
188
```typescript { .api }
189
type EncodedMonitoringMessage = [RunnerMessageCode, any];
190
191
interface WritableStream<T> {
192
write(chunk: T): boolean;
193
}
194
195
interface RunnerMessageCode {
196
PING: 0;
197
PONG: 1;
198
KILL: 6;
199
STOP: 7;
200
MONITORING: 8;
201
MONITORING_RATE: 9;
202
MONITORING_REPLY: 10;
203
ALIVE: 11;
204
EVENT: 12;
205
SEQUENCE_STOPPED: 13;
206
SEQUENCE_COMPLETED: 14;
207
PANG: 15;
208
ACKNOWLEDGE: 16;
209
}
210
```