or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdqueuing-strategies.mdreadable-streams.mdtransform-streams.mdwritable-streams.md

transform-streams.mddocs/

0

# Transform Streams

1

2

Transform stream functionality that connects a writable side to a readable side, allowing data to be transformed as it flows through.

3

4

## Capabilities

5

6

### TransformStream Class

7

8

A transform stream consists of a pair of streams: a writable stream (writable side) and a readable stream (readable side). Writes to the writable side result in new data being made available for reading from the readable side.

9

10

```typescript { .api }

11

/**

12

* A transform stream consists of a writable stream and a readable stream connected together

13

*/

14

class TransformStream<I = any, O = any> {

15

constructor(

16

transformer?: Transformer<I, O>,

17

writableStrategy?: QueuingStrategy<I>,

18

readableStrategy?: QueuingStrategy<O>

19

);

20

21

/** The readable side of the transform stream */

22

readonly readable: ReadableStream<O>;

23

24

/** The writable side of the transform stream */

25

readonly writable: WritableStream<I>;

26

}

27

```

28

29

**Usage Examples:**

30

31

```typescript

32

import { TransformStream } from "web-streams-polyfill";

33

34

// Create a transform stream that converts text to uppercase

35

const upperCaseTransform = new TransformStream({

36

transform(chunk, controller) {

37

controller.enqueue(chunk.toString().toUpperCase());

38

}

39

});

40

41

// Create a transform stream that filters out empty lines

42

const filterEmptyLines = new TransformStream({

43

transform(chunk, controller) {

44

const line = chunk.toString().trim();

45

if (line.length > 0) {

46

controller.enqueue(line);

47

}

48

}

49

});

50

51

// Create a transform stream that adds line numbers

52

let lineNumber = 1;

53

const addLineNumbers = new TransformStream({

54

start(controller) {

55

lineNumber = 1;

56

},

57

transform(chunk, controller) {

58

const line = chunk.toString();

59

controller.enqueue(`${lineNumber++}: ${line}`);

60

}

61

});

62

63

// Chain transforms together

64

await inputStream

65

.pipeThrough(filterEmptyLines)

66

.pipeThrough(upperCaseTransform)

67

.pipeThrough(addLineNumbers)

68

.pipeTo(outputStream);

69

```

70

71

### TransformStreamDefaultController

72

73

Controller provided to transformers for managing the transform stream's readable side.

74

75

```typescript { .api }

76

/**

77

* Controller for transform streams that manages the readable side

78

*/

79

class TransformStreamDefaultController<O> {

80

/** Returns the desired size to fill the controlled transform stream's readable side internal queue */

81

readonly desiredSize: number | null;

82

83

/** Enqueue a chunk to the controlled transform stream's readable side */

84

enqueue(chunk: O): void;

85

86

/** Error both sides of the controlled transform stream */

87

error(reason?: any): void;

88

89

/** Close the controlled transform stream's readable side and error the writable side */

90

terminate(): void;

91

}

92

```

93

94

**Usage Examples:**

95

96

```typescript

97

import { TransformStream } from "web-streams-polyfill";

98

99

// Transform stream that splits input into multiple chunks

100

const splitterTransform = new TransformStream({

101

transform(chunk, controller) {

102

const text = chunk.toString();

103

const words = text.split(' ');

104

105

// Enqueue each word as a separate chunk

106

for (const word of words) {

107

if (word.trim()) {

108

controller.enqueue(word.trim());

109

}

110

}

111

}

112

});

113

114

// Transform stream with error handling

115

const validationTransform = new TransformStream({

116

transform(chunk, controller) {

117

try {

118

const data = JSON.parse(chunk.toString());

119

120

if (!data.id) {

121

controller.error(new Error("Missing required 'id' field"));

122

return;

123

}

124

125

controller.enqueue(data);

126

} catch (error) {

127

controller.error(new Error(`Invalid JSON: ${error.message}`));

128

}

129

}

130

});

131

132

// Transform stream that terminates early

133

const takeFirstN = (n: number) => {

134

let count = 0;

135

return new TransformStream({

136

transform(chunk, controller) {

137

if (count < n) {

138

controller.enqueue(chunk);

139

count++;

140

} else {

141

controller.terminate(); // Stop processing more chunks

142

}

143

}

144

});

145

};

146

```

147

148

## Transformer Types

149

150

### Transformer Interface

151

152

Configuration object for transform streams that defines how data is transformed.

153

154

```typescript { .api }

155

interface Transformer<I = any, O = any> {

156

/** Called immediately during construction of the TransformStream */

157

start?: (controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;

158

159

/** Called when a new chunk of data is ready to be transformed */

160

transform?: (chunk: I, controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;

161

162

/** Called after all chunks written to the writable side have been transformed */

163

flush?: (controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;

164

165

/** Called when the readable side is cancelled or the writable side is aborted */

166

cancel?: (reason: any) => void | PromiseLike<void>;

167

168

/** Must be undefined for default transform streams */

169

readableType?: undefined;

170

171

/** Must be undefined for default transform streams */

172

writableType?: undefined;

173

}

174

```

175

176

**Usage Examples:**

177

178

```typescript

179

import { TransformStream } from "web-streams-polyfill";

180

181

// JSON processing transform stream

182

const jsonProcessor = new TransformStream({

183

start(controller) {

184

console.log("Starting JSON processing");

185

this.buffer = '';

186

},

187

188

transform(chunk, controller) {

189

this.buffer += chunk.toString();

190

191

// Process complete JSON objects

192

let startIndex = 0;

193

let braceCount = 0;

194

195

for (let i = 0; i < this.buffer.length; i++) {

196

if (this.buffer[i] === '{') braceCount++;

197

if (this.buffer[i] === '}') braceCount--;

198

199

if (braceCount === 0 && i > startIndex) {

200

const jsonStr = this.buffer.slice(startIndex, i + 1);

201

try {

202

const obj = JSON.parse(jsonStr);

203

controller.enqueue(obj);

204

} catch (error) {

205

controller.error(new Error(`Invalid JSON: ${error.message}`));

206

return;

207

}

208

startIndex = i + 1;

209

}

210

}

211

212

// Keep remaining incomplete JSON in buffer

213

this.buffer = this.buffer.slice(startIndex);

214

},

215

216

flush(controller) {

217

if (this.buffer.trim()) {

218

controller.error(new Error("Incomplete JSON at end of stream"));

219

}

220

console.log("JSON processing completed");

221

}

222

});

223

224

// Compression transform stream

225

const compressionTransform = new TransformStream({

226

start(controller) {

227

this.chunks = [];

228

},

229

230

transform(chunk, controller) {

231

// Collect chunks for batch compression

232

this.chunks.push(chunk);

233

234

// Flush when we have enough data

235

if (this.chunks.length >= 10) {

236

const combined = this.chunks.join('');

237

this.chunks = [];

238

239

// Simulate compression

240

const compressed = `compressed(${combined})`;

241

controller.enqueue(compressed);

242

}

243

},

244

245

flush(controller) {

246

// Flush remaining chunks

247

if (this.chunks.length > 0) {

248

const combined = this.chunks.join('');

249

const compressed = `compressed(${combined})`;

250

controller.enqueue(compressed);

251

}

252

}

253

});

254

255

// Rate limiting transform stream

256

const rateLimiter = (itemsPerSecond: number) => {

257

let lastTime = Date.now();

258

const interval = 1000 / itemsPerSecond;

259

260

return new TransformStream({

261

async transform(chunk, controller) {

262

const now = Date.now();

263

const timeDiff = now - lastTime;

264

265

if (timeDiff < interval) {

266

// Wait to maintain rate limit

267

await new Promise(resolve =>

268

setTimeout(resolve, interval - timeDiff)

269

);

270

}

271

272

controller.enqueue(chunk);

273

lastTime = Date.now();

274

}

275

});

276

};

277

```

278

279

## Callback Types

280

281

```typescript { .api }

282

type TransformerStartCallback<O> = (

283

controller: TransformStreamDefaultController<O>

284

) => void | PromiseLike<void>;

285

286

type TransformerTransformCallback<I, O> = (

287

chunk: I,

288

controller: TransformStreamDefaultController<O>

289

) => void | PromiseLike<void>;

290

291

type TransformerFlushCallback<O> = (

292

controller: TransformStreamDefaultController<O>

293

) => void | PromiseLike<void>;

294

295

type TransformerCancelCallback = (reason: any) => void | PromiseLike<void>;

296

```

297

298

## Common Transform Patterns

299

300

### Identity Transform

301

302

```typescript

303

// Pass through transform (no modification)

304

const identityTransform = new TransformStream();

305

306

// Equivalent to:

307

const identityTransform = new TransformStream({

308

transform(chunk, controller) {

309

controller.enqueue(chunk);

310

}

311

});

312

```

313

314

### Buffering Transform

315

316

```typescript

317

// Buffer chunks and emit arrays

318

const bufferTransform = (bufferSize: number) => new TransformStream({

319

start(controller) {

320

this.buffer = [];

321

},

322

323

transform(chunk, controller) {

324

this.buffer.push(chunk);

325

326

if (this.buffer.length >= bufferSize) {

327

controller.enqueue([...this.buffer]);

328

this.buffer = [];

329

}

330

},

331

332

flush(controller) {

333

if (this.buffer.length > 0) {

334

controller.enqueue([...this.buffer]);

335

}

336

}

337

});

338

```

339

340

### Async Transform

341

342

```typescript

343

// Transform with async operations

344

const asyncTransform = new TransformStream({

345

async transform(chunk, controller) {

346

try {

347

// Simulate async processing (e.g., API call)

348

const result = await processAsync(chunk);

349

controller.enqueue(result);

350

} catch (error) {

351

controller.error(error);

352

}

353

}

354

});

355

356

async function processAsync(data: any): Promise<any> {

357

// Simulate async work

358

await new Promise(resolve => setTimeout(resolve, 100));

359

return { processed: data, timestamp: Date.now() };

360

}

361

```