or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-context.mdhost-communication.mdindex.mdmessage-utilities.mdrunner-execution.mdstream-processing.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Stream processing utilities for handling input streams with different content types and formats, including header parsing and data stream mapping.

3

4

## Core Imports

5

6

```typescript

7

import {

8

readInputStreamHeaders,

9

mapToInputDataStream,

10

inputStreamInitLogger

11

} from "@scramjet/runner/src/input-stream";

12

```

13

14

## Capabilities

15

16

### Input Stream Header Reading

17

18

Reads HTTP-style headers from input streams to determine content type and other metadata.

19

20

```typescript { .api }

21

/**

22

* Read HTTP-style headers from a readable stream

23

* @param stream - Readable stream containing headers followed by data

24

* @returns Promise resolving to object with header key/values (header names are lowercase)

25

*/

26

function readInputStreamHeaders(stream: Readable): Promise<Record<string, string>>;

27

```

28

29

**Usage Example:**

30

31

```typescript

32

import { readInputStreamHeaders } from "@scramjet/runner/src/input-stream";

33

34

// Read headers from input stream

35

const headers = await readInputStreamHeaders(inputStream);

36

console.log(headers);

37

// Output: { "content-type": "application/x-ndjson", "content-length": "1024" }

38

39

// Use content type for stream processing

40

const contentType = headers["content-type"];

41

const dataStream = mapToInputDataStream(inputStream, contentType);

42

```

43

44

The function expects headers in HTTP format with CRLF line endings and a blank line separating headers from body:

45

46

```

47

Content-Type: application/x-ndjson\r\n

48

Content-Length: 1024\r\n

49

\r\n

50

[actual data starts here]

51

```

52

53

### Input Data Stream Mapping

54

55

Maps input streams to appropriate DataStream types based on content type.

56

57

```typescript { .api }

58

/**

59

* Map input stream to DataStream based on content type

60

* @param stream - Readable stream containing data

61

* @param contentType - Content type string determining stream processing

62

* @returns DataStream configured for the specified content type

63

* @throws Error if contentType is undefined or unsupported

64

*/

65

function mapToInputDataStream(stream: Readable, contentType: string): DataStream;

66

```

67

68

**Supported Content Types:**

69

70

- `application/x-ndjson`: Newline-delimited JSON (parsed as JSON objects)

71

- `text/x-ndjson`: Newline-delimited JSON (parsed as JSON objects)

72

- `text/plain`: Plain text (UTF-8 encoded StringStream)

73

- `application/octet-stream`: Binary data (BufferStream)

74

75

**Usage Example:**

76

77

```typescript

78

import { mapToInputDataStream } from "@scramjet/runner/src/input-stream";

79

80

// Process JSON data

81

const jsonStream = mapToInputDataStream(inputStream, "application/x-ndjson");

82

jsonStream.each(obj => {

83

console.log("Received object:", obj);

84

});

85

86

// Process plain text

87

const textStream = mapToInputDataStream(inputStream, "text/plain");

88

textStream.each(line => {

89

console.log("Text line:", line);

90

});

91

92

// Process binary data

93

const binaryStream = mapToInputDataStream(inputStream, "application/octet-stream");

94

binaryStream.each(buffer => {

95

console.log("Binary chunk:", buffer);

96

});

97

98

// Handle unsupported content type

99

try {

100

const unsupportedStream = mapToInputDataStream(inputStream, "image/png");

101

} catch (error) {

102

console.error(error.message);

103

// "Content-Type does not match any supported value. The actual value is image/png"

104

}

105

```

106

107

### Input Stream Logger

108

109

Logger instance for input stream initialization and processing.

110

111

```typescript { .api }

112

/**

113

* Logger instance for input stream initialization

114

*/

115

const inputStreamInitLogger: IObjectLogger;

116

```

117

118

**Usage Example:**

119

120

```typescript

121

import { inputStreamInitLogger } from "@scramjet/runner/src/input-stream";

122

123

// The logger is automatically used by stream processing functions

124

// You can also access it directly for custom logging

125

inputStreamInitLogger.debug("Processing input stream", { contentType: "application/json" });

126

```

127

128

## Advanced Usage

129

130

### Complete Stream Processing Pipeline

131

132

```typescript

133

import {

134

readInputStreamHeaders,

135

mapToInputDataStream,

136

inputStreamInitLogger

137

} from "@scramjet/runner/src/input-stream";

138

139

async function processInputStream(inputStream: Readable) {

140

try {

141

// Read headers to determine content type

142

const headers = await readInputStreamHeaders(inputStream);

143

inputStreamInitLogger.debug("Headers received", headers);

144

145

const contentType = headers["content-type"];

146

if (!contentType) {

147

throw new Error("Content-Type header is required");

148

}

149

150

// Map to appropriate data stream

151

const dataStream = mapToInputDataStream(inputStream, contentType);

152

153

// Process based on content type

154

switch (contentType) {

155

case "application/x-ndjson":

156

case "text/x-ndjson":

157

return dataStream.map(obj => ({

158

...obj,

159

processed: true,

160

timestamp: Date.now()

161

}));

162

163

case "text/plain":

164

return dataStream

165

.split("\n")

166

.filter(line => line.trim().length > 0)

167

.map(line => ({ text: line, length: line.length }));

168

169

case "application/octet-stream":

170

return dataStream.map(buffer => ({

171

size: buffer.length,

172

checksum: buffer.reduce((sum, byte) => sum + byte, 0)

173

}));

174

175

default:

176

throw new Error(`Unsupported content type: ${contentType}`);

177

}

178

} catch (error) {

179

inputStreamInitLogger.error("Stream processing failed", error);

180

throw error;

181

}

182

}

183

```

184

185

### Custom Content Type Handling

186

187

```typescript

188

function processCustomContentType(stream: Readable, contentType: string) {

189

// For unsupported content types, fall back to binary processing

190

if (!["application/x-ndjson", "text/x-ndjson", "text/plain", "application/octet-stream"].includes(contentType)) {

191

inputStreamInitLogger.warn(`Unsupported content type ${contentType}, treating as binary`);

192

return mapToInputDataStream(stream, "application/octet-stream");

193

}

194

195

return mapToInputDataStream(stream, contentType);

196

}

197

```

198

199

### Error Handling

200

201

```typescript

202

async function safeStreamProcessing(inputStream: Readable) {

203

try {

204

const headers = await readInputStreamHeaders(inputStream);

205

const contentType = headers["content-type"];

206

207

if (!contentType) {

208

inputStreamInitLogger.error("Missing Content-Type header");

209

throw new Error("Content-Type header is required");

210

}

211

212

const dataStream = mapToInputDataStream(inputStream, contentType);

213

return dataStream;

214

215

} catch (error) {

216

if (error.message.includes("Content-Type does not match")) {

217

inputStreamInitLogger.error("Unsupported content type", { contentType });

218

// Fallback to binary processing

219

return mapToInputDataStream(inputStream, "application/octet-stream");

220

}

221

222

inputStreamInitLogger.error("Stream processing error", error);

223

throw error;

224

}

225

}

226

```

227

228

## Supporting Types

229

230

```typescript { .api }

231

interface Readable {

232

read(): Buffer | null;

233

on(event: string, callback: Function): this;

234

off(event: string, callback: Function): this;

235

unshift(chunk: Buffer): void;

236

}

237

238

interface DataStream {

239

map<U>(fn: (item: any) => U): DataStream;

240

filter(fn: (item: any) => boolean): DataStream;

241

each(fn: (item: any) => void): DataStream;

242

pipe(destination: any): any;

243

}

244

245

interface StringStream extends DataStream {

246

split(separator: string): StringStream;

247

JSONParse(multi?: boolean): DataStream;

248

}

249

250

interface BufferStream extends DataStream {

251

// Buffer-specific stream methods

252

}

253

254

interface IObjectLogger {

255

debug(message: string, data?: any): void;

256

info(message: string, data?: any): void;

257

warn(message: string, data?: any): void;

258

error(message: string, data?: any): void;

259

}

260

```