0
# Stream Processing
1
2
Stream processing utilities for handling input streams with different content types and formats, including header parsing and data stream mapping.
3
4
## Core Imports
5
6
```typescript
7
import {
8
readInputStreamHeaders,
9
mapToInputDataStream,
10
inputStreamInitLogger
11
} from "@scramjet/runner/src/input-stream";
12
```
13
14
## Capabilities
15
16
### Input Stream Header Reading
17
18
Reads HTTP-style headers from input streams to determine content type and other metadata.
19
20
```typescript { .api }
21
/**
22
* Read HTTP-style headers from a readable stream
23
* @param stream - Readable stream containing headers followed by data
24
* @returns Promise resolving to object with header key/values (header names are lowercase)
25
*/
26
function readInputStreamHeaders(stream: Readable): Promise<Record<string, string>>;
27
```
28
29
**Usage Example:**
30
31
```typescript
32
import { readInputStreamHeaders } from "@scramjet/runner/src/input-stream";
33
34
// Read headers from input stream
35
const headers = await readInputStreamHeaders(inputStream);
36
console.log(headers);
37
// Output: { "content-type": "application/x-ndjson", "content-length": "1024" }
38
39
// Use content type for stream processing
40
const contentType = headers["content-type"];
41
const dataStream = mapToInputDataStream(inputStream, contentType);
42
```
43
44
The function expects headers in HTTP format with CRLF line endings and a blank line separating headers from body:
45
46
```
47
Content-Type: application/x-ndjson\r\n
48
Content-Length: 1024\r\n
49
\r\n
50
[actual data starts here]
51
```
52
53
### Input Data Stream Mapping
54
55
Maps input streams to appropriate DataStream types based on content type.
56
57
```typescript { .api }
58
/**
59
* Map input stream to DataStream based on content type
60
* @param stream - Readable stream containing data
61
* @param contentType - Content type string determining stream processing
62
* @returns DataStream configured for the specified content type
63
* @throws Error if contentType is undefined or unsupported
64
*/
65
function mapToInputDataStream(stream: Readable, contentType: string): DataStream;
66
```
67
68
**Supported Content Types:**
69
70
- `application/x-ndjson`: Newline-delimited JSON (parsed as JSON objects)
71
- `text/x-ndjson`: Newline-delimited JSON (parsed as JSON objects)
72
- `text/plain`: Plain text (UTF-8 encoded StringStream)
73
- `application/octet-stream`: Binary data (BufferStream)
74
75
**Usage Example:**
76
77
```typescript
78
import { mapToInputDataStream } from "@scramjet/runner/src/input-stream";
79
80
// Process JSON data
81
const jsonStream = mapToInputDataStream(inputStream, "application/x-ndjson");
82
jsonStream.each(obj => {
83
console.log("Received object:", obj);
84
});
85
86
// Process plain text
87
const textStream = mapToInputDataStream(inputStream, "text/plain");
88
textStream.each(line => {
89
console.log("Text line:", line);
90
});
91
92
// Process binary data
93
const binaryStream = mapToInputDataStream(inputStream, "application/octet-stream");
94
binaryStream.each(buffer => {
95
console.log("Binary chunk:", buffer);
96
});
97
98
// Handle unsupported content type
99
try {
100
const unsupportedStream = mapToInputDataStream(inputStream, "image/png");
101
} catch (error) {
102
console.error(error.message);
103
// "Content-Type does not match any supported value. The actual value is image/png"
104
}
105
```
106
107
### Input Stream Logger
108
109
Logger instance for input stream initialization and processing.
110
111
```typescript { .api }
112
/**
113
* Logger instance for input stream initialization
114
*/
115
const inputStreamInitLogger: IObjectLogger;
116
```
117
118
**Usage Example:**
119
120
```typescript
121
import { inputStreamInitLogger } from "@scramjet/runner/src/input-stream";
122
123
// The logger is automatically used by stream processing functions
124
// You can also access it directly for custom logging
125
inputStreamInitLogger.debug("Processing input stream", { contentType: "application/json" });
126
```
127
128
## Advanced Usage
129
130
### Complete Stream Processing Pipeline
131
132
```typescript
133
import {
134
readInputStreamHeaders,
135
mapToInputDataStream,
136
inputStreamInitLogger
137
} from "@scramjet/runner/src/input-stream";
138
139
async function processInputStream(inputStream: Readable) {
140
try {
141
// Read headers to determine content type
142
const headers = await readInputStreamHeaders(inputStream);
143
inputStreamInitLogger.debug("Headers received", headers);
144
145
const contentType = headers["content-type"];
146
if (!contentType) {
147
throw new Error("Content-Type header is required");
148
}
149
150
// Map to appropriate data stream
151
const dataStream = mapToInputDataStream(inputStream, contentType);
152
153
// Process based on content type
154
switch (contentType) {
155
case "application/x-ndjson":
156
case "text/x-ndjson":
157
return dataStream.map(obj => ({
158
...obj,
159
processed: true,
160
timestamp: Date.now()
161
}));
162
163
case "text/plain":
164
return dataStream
165
.split("\n")
166
.filter(line => line.trim().length > 0)
167
.map(line => ({ text: line, length: line.length }));
168
169
case "application/octet-stream":
170
return dataStream.map(buffer => ({
171
size: buffer.length,
172
checksum: buffer.reduce((sum, byte) => sum + byte, 0)
173
}));
174
175
default:
176
throw new Error(`Unsupported content type: ${contentType}`);
177
}
178
} catch (error) {
179
inputStreamInitLogger.error("Stream processing failed", error);
180
throw error;
181
}
182
}
183
```
184
185
### Custom Content Type Handling
186
187
```typescript
188
function processCustomContentType(stream: Readable, contentType: string) {
189
// For unsupported content types, fall back to binary processing
190
if (!["application/x-ndjson", "text/x-ndjson", "text/plain", "application/octet-stream"].includes(contentType)) {
191
inputStreamInitLogger.warn(`Unsupported content type ${contentType}, treating as binary`);
192
return mapToInputDataStream(stream, "application/octet-stream");
193
}
194
195
return mapToInputDataStream(stream, contentType);
196
}
197
```
198
199
### Error Handling
200
201
```typescript
202
async function safeStreamProcessing(inputStream: Readable) {
203
try {
204
const headers = await readInputStreamHeaders(inputStream);
205
const contentType = headers["content-type"];
206
207
if (!contentType) {
208
inputStreamInitLogger.error("Missing Content-Type header");
209
throw new Error("Content-Type header is required");
210
}
211
212
const dataStream = mapToInputDataStream(inputStream, contentType);
213
return dataStream;
214
215
} catch (error) {
216
if (error.message.includes("Content-Type does not match")) {
217
inputStreamInitLogger.error("Unsupported content type", { contentType });
218
// Fallback to binary processing
219
return mapToInputDataStream(inputStream, "application/octet-stream");
220
}
221
222
inputStreamInitLogger.error("Stream processing error", error);
223
throw error;
224
}
225
}
226
```
227
228
## Supporting Types
229
230
```typescript { .api }
231
interface Readable {
232
read(): Buffer | null;
233
on(event: string, callback: Function): this;
234
off(event: string, callback: Function): this;
235
unshift(chunk: Buffer): void;
236
}
237
238
interface DataStream {
239
map<U>(fn: (item: any) => U): DataStream;
240
filter(fn: (item: any) => boolean): DataStream;
241
each(fn: (item: any) => void): DataStream;
242
pipe(destination: any): any;
243
}
244
245
interface StringStream extends DataStream {
246
split(separator: string): StringStream;
247
JSONParse(multi?: boolean): DataStream;
248
}
249
250
interface BufferStream extends DataStream {
251
// Buffer-specific stream methods
252
}
253
254
interface IObjectLogger {
255
debug(message: string, data?: any): void;
256
info(message: string, data?: any): void;
257
warn(message: string, data?: any): void;
258
error(message: string, data?: any): void;
259
}
260
```