0
# Transform Streams
1
2
Transform stream functionality that connects a writable side to a readable side, allowing data to be transformed as it flows through.
3
4
## Capabilities
5
6
### TransformStream Class
7
8
A transform stream consists of a pair of streams: a writable stream (writable side) and a readable stream (readable side). Writes to the writable side result in new data being made available for reading from the readable side.
9
10
```typescript { .api }
11
/**
12
* A transform stream consists of a writable stream and a readable stream connected together
13
*/
14
class TransformStream<I = any, O = any> {
15
constructor(
16
transformer?: Transformer<I, O>,
17
writableStrategy?: QueuingStrategy<I>,
18
readableStrategy?: QueuingStrategy<O>
19
);
20
21
/** The readable side of the transform stream */
22
readonly readable: ReadableStream<O>;
23
24
/** The writable side of the transform stream */
25
readonly writable: WritableStream<I>;
26
}
27
```
28
29
**Usage Examples:**
30
31
```typescript
32
import { TransformStream } from "web-streams-polyfill";
33
34
// Create a transform stream that converts text to uppercase
35
const upperCaseTransform = new TransformStream({
36
transform(chunk, controller) {
37
controller.enqueue(chunk.toString().toUpperCase());
38
}
39
});
40
41
// Create a transform stream that filters out empty lines
42
const filterEmptyLines = new TransformStream({
43
transform(chunk, controller) {
44
const line = chunk.toString().trim();
45
if (line.length > 0) {
46
controller.enqueue(line);
47
}
48
}
49
});
50
51
// Create a transform stream that adds line numbers
52
let lineNumber = 1;
53
const addLineNumbers = new TransformStream({
54
start(controller) {
55
lineNumber = 1;
56
},
57
transform(chunk, controller) {
58
const line = chunk.toString();
59
controller.enqueue(`${lineNumber++}: ${line}`);
60
}
61
});
62
63
// Chain transforms together
64
await inputStream
65
.pipeThrough(filterEmptyLines)
66
.pipeThrough(upperCaseTransform)
67
.pipeThrough(addLineNumbers)
68
.pipeTo(outputStream);
69
```
70
71
### TransformStreamDefaultController
72
73
Controller provided to transformers for managing the transform stream's readable side.
74
75
```typescript { .api }
76
/**
77
* Controller for transform streams that manages the readable side
78
*/
79
class TransformStreamDefaultController<O> {
80
/** Returns the desired size to fill the controlled transform stream's readable side internal queue */
81
readonly desiredSize: number | null;
82
83
/** Enqueue a chunk to the controlled transform stream's readable side */
84
enqueue(chunk: O): void;
85
86
/** Error both sides of the controlled transform stream */
87
error(reason?: any): void;
88
89
/** Close the controlled transform stream's readable side and error the writable side */
90
terminate(): void;
91
}
92
```
93
94
**Usage Examples:**
95
96
```typescript
97
import { TransformStream } from "web-streams-polyfill";
98
99
// Transform stream that splits input into multiple chunks
100
const splitterTransform = new TransformStream({
101
transform(chunk, controller) {
102
const text = chunk.toString();
103
const words = text.split(' ');
104
105
// Enqueue each word as a separate chunk
106
for (const word of words) {
107
if (word.trim()) {
108
controller.enqueue(word.trim());
109
}
110
}
111
}
112
});
113
114
// Transform stream with error handling
115
const validationTransform = new TransformStream({
116
transform(chunk, controller) {
117
try {
118
const data = JSON.parse(chunk.toString());
119
120
if (!data.id) {
121
controller.error(new Error("Missing required 'id' field"));
122
return;
123
}
124
125
controller.enqueue(data);
126
} catch (error) {
127
controller.error(new Error(`Invalid JSON: ${error.message}`));
128
}
129
}
130
});
131
132
// Transform stream that terminates early
133
const takeFirstN = (n: number) => {
134
let count = 0;
135
return new TransformStream({
136
transform(chunk, controller) {
137
if (count < n) {
138
controller.enqueue(chunk);
139
count++;
140
} else {
141
controller.terminate(); // Stop processing more chunks
142
}
143
}
144
});
145
};
146
```
147
148
## Transformer Types
149
150
### Transformer Interface
151
152
Configuration object for transform streams that defines how data is transformed.
153
154
```typescript { .api }
155
interface Transformer<I = any, O = any> {
156
/** Called immediately during construction of the TransformStream */
157
start?: (controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
158
159
/** Called when a new chunk of data is ready to be transformed */
160
transform?: (chunk: I, controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
161
162
/** Called after all chunks written to the writable side have been transformed */
163
flush?: (controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
164
165
/** Called when the readable side is cancelled or the writable side is aborted */
166
cancel?: (reason: any) => void | PromiseLike<void>;
167
168
/** Must be undefined for default transform streams */
169
readableType?: undefined;
170
171
/** Must be undefined for default transform streams */
172
writableType?: undefined;
173
}
174
```
175
176
**Usage Examples:**
177
178
```typescript
179
import { TransformStream } from "web-streams-polyfill";
180
181
// JSON processing transform stream
182
const jsonProcessor = new TransformStream({
183
start(controller) {
184
console.log("Starting JSON processing");
185
this.buffer = '';
186
},
187
188
transform(chunk, controller) {
189
this.buffer += chunk.toString();
190
191
// Process complete JSON objects
192
let startIndex = 0;
193
let braceCount = 0;
194
195
for (let i = 0; i < this.buffer.length; i++) {
196
if (this.buffer[i] === '{') braceCount++;
197
if (this.buffer[i] === '}') braceCount--;
198
199
if (braceCount === 0 && i > startIndex) {
200
const jsonStr = this.buffer.slice(startIndex, i + 1);
201
try {
202
const obj = JSON.parse(jsonStr);
203
controller.enqueue(obj);
204
} catch (error) {
205
controller.error(new Error(`Invalid JSON: ${error.message}`));
206
return;
207
}
208
startIndex = i + 1;
209
}
210
}
211
212
// Keep remaining incomplete JSON in buffer
213
this.buffer = this.buffer.slice(startIndex);
214
},
215
216
flush(controller) {
217
if (this.buffer.trim()) {
218
controller.error(new Error("Incomplete JSON at end of stream"));
219
}
220
console.log("JSON processing completed");
221
}
222
});
223
224
// Compression transform stream
225
const compressionTransform = new TransformStream({
226
start(controller) {
227
this.chunks = [];
228
},
229
230
transform(chunk, controller) {
231
// Collect chunks for batch compression
232
this.chunks.push(chunk);
233
234
// Flush when we have enough data
235
if (this.chunks.length >= 10) {
236
const combined = this.chunks.join('');
237
this.chunks = [];
238
239
// Simulate compression
240
const compressed = `compressed(${combined})`;
241
controller.enqueue(compressed);
242
}
243
},
244
245
flush(controller) {
246
// Flush remaining chunks
247
if (this.chunks.length > 0) {
248
const combined = this.chunks.join('');
249
const compressed = `compressed(${combined})`;
250
controller.enqueue(compressed);
251
}
252
}
253
});
254
255
// Rate limiting transform stream
256
const rateLimiter = (itemsPerSecond: number) => {
257
let lastTime = Date.now();
258
const interval = 1000 / itemsPerSecond;
259
260
return new TransformStream({
261
async transform(chunk, controller) {
262
const now = Date.now();
263
const timeDiff = now - lastTime;
264
265
if (timeDiff < interval) {
266
// Wait to maintain rate limit
267
await new Promise(resolve =>
268
setTimeout(resolve, interval - timeDiff)
269
);
270
}
271
272
controller.enqueue(chunk);
273
lastTime = Date.now();
274
}
275
});
276
};
277
```
278
279
## Callback Types
280
281
```typescript { .api }
282
type TransformerStartCallback<O> = (
283
controller: TransformStreamDefaultController<O>
284
) => void | PromiseLike<void>;
285
286
type TransformerTransformCallback<I, O> = (
287
chunk: I,
288
controller: TransformStreamDefaultController<O>
289
) => void | PromiseLike<void>;
290
291
type TransformerFlushCallback<O> = (
292
controller: TransformStreamDefaultController<O>
293
) => void | PromiseLike<void>;
294
295
type TransformerCancelCallback = (reason: any) => void | PromiseLike<void>;
296
```
297
298
## Common Transform Patterns
299
300
### Identity Transform
301
302
```typescript
303
// Pass through transform (no modification)
304
const identityTransform = new TransformStream();
305
306
// Equivalent to:
307
const identityTransform = new TransformStream({
308
transform(chunk, controller) {
309
controller.enqueue(chunk);
310
}
311
});
312
```
313
314
### Buffering Transform
315
316
```typescript
317
// Buffer chunks and emit arrays
318
const bufferTransform = (bufferSize: number) => new TransformStream({
319
start(controller) {
320
this.buffer = [];
321
},
322
323
transform(chunk, controller) {
324
this.buffer.push(chunk);
325
326
if (this.buffer.length >= bufferSize) {
327
controller.enqueue([...this.buffer]);
328
this.buffer = [];
329
}
330
},
331
332
flush(controller) {
333
if (this.buffer.length > 0) {
334
controller.enqueue([...this.buffer]);
335
}
336
}
337
});
338
```
339
340
### Async Transform
341
342
```typescript
343
// Transform with async operations
344
const asyncTransform = new TransformStream({
345
async transform(chunk, controller) {
346
try {
347
// Simulate async processing (e.g., API call)
348
const result = await processAsync(chunk);
349
controller.enqueue(result);
350
} catch (error) {
351
controller.error(error);
352
}
353
}
354
});
355
356
async function processAsync(data: any): Promise<any> {
357
// Simulate async work
358
await new Promise(resolve => setTimeout(resolve, 100));
359
return { processed: data, timestamp: Date.now() };
360
}
361
```