0
# Transform Streams
1
2
Transform streams for data transformation, extending Duplex with specialized transformation capabilities that map written data to readable output.
3
4
## Capabilities
5
6
### Transform Constructor
7
8
Creates a new Transform stream with transformation configuration.
9
10
```javascript { .api }
11
/**
12
* Creates a new Transform stream
13
* @param opts - Configuration options including transformation function
14
*/
15
class Transform extends Duplex {
16
constructor(opts?: TransformOptions);
17
}
18
19
interface TransformOptions extends DuplexOptions {
20
/** Transform function shorthand */
21
transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;
22
/** Flush function shorthand */
23
flush?: (cb: (error?: Error, result?: any) => void) => void;
24
}
25
```
26
27
**Usage Example:**
28
29
```javascript
30
const { Transform } = require("@mafintosh/streamx");
31
32
const upperCaseTransform = new Transform({
33
transform(data, cb) {
34
const result = data.toString().toUpperCase();
35
cb(null, result);
36
}
37
});
38
```
39
40
### Transformation Logic
41
42
#### _transform Method
43
44
Called to transform incoming data. Override to implement transformation logic.
45
46
```javascript { .api }
47
/**
48
* Transform incoming data
49
* @param data - Data to transform
50
* @param callback - Callback with error and transformed result, or use push()
51
*/
52
_transform(data: any, callback: (error?: Error, result?: any) => void): void;
53
```
54
55
**Usage Example:**
56
57
```javascript
58
class JSONTransform extends Transform {
59
_transform(data, cb) {
60
try {
61
const parsed = JSON.parse(data.toString());
62
const transformed = {
63
...parsed,
64
processed: true,
65
timestamp: Date.now()
66
};
67
cb(null, JSON.stringify(transformed));
68
} catch (err) {
69
cb(err);
70
}
71
}
72
}
73
```
74
75
#### _flush Method
76
77
Called at the end of the transform to flush any remaining data.
78
79
```javascript { .api }
80
/**
81
* Called to flush any remaining data at the end of transformation
82
* @param callback - Callback with error and optional final data
83
*/
84
_flush(callback: (error?: Error, result?: any) => void): void;
85
```
86
87
#### _final Method
88
89
Called internally by the stream when ending. Automatically calls `_flush`.
90
91
```javascript { .api }
92
/**
93
* Called internally when the transform stream is ending
94
* @param callback - Callback to call when final processing is complete
95
*/
96
_final(callback: (error?: Error) => void): void;
97
```
98
99
### Transform Patterns
100
101
#### Using Callback
102
103
Transform data and pass result via callback:
104
105
```javascript
106
const csvToJson = new Transform({
107
transform(data, cb) {
108
const lines = data.toString().split('\n');
109
const result = lines.map(line => {
110
const [name, age] = line.split(',');
111
return JSON.stringify({ name, age: parseInt(age) });
112
}).join('\n');
113
cb(null, result);
114
}
115
});
116
```
117
118
#### Using Push
119
120
Transform data and push result directly:
121
122
```javascript
123
const splitTransform = new Transform({
124
transform(data, cb) {
125
const lines = data.toString().split('\n');
126
lines.forEach(line => {
127
if (line.trim()) {
128
this.push(line.trim());
129
}
130
});
131
cb(null); // No result via callback
132
}
133
});
134
```
135
136
#### Multiple Output
137
138
Generate multiple outputs for single input:
139
140
```javascript
141
const duplicateTransform = new Transform({
142
transform(data, cb) {
143
const str = data.toString();
144
this.push(`Original: ${str}`);
145
this.push(`Copy: ${str}`);
146
this.push(`Uppercase: ${str.toUpperCase()}`);
147
cb(null);
148
}
149
});
150
```
151
152
### Common Transform Examples
153
154
#### Text Processing
155
156
```javascript
157
const { Transform } = require("@mafintosh/streamx");
158
159
// Line-by-line processor
160
class LineProcessor extends Transform {
161
constructor() {
162
super();
163
this.buffer = '';
164
}
165
166
_transform(chunk, cb) {
167
this.buffer += chunk.toString();
168
const lines = this.buffer.split('\n');
169
170
// Keep the last incomplete line in buffer
171
this.buffer = lines.pop();
172
173
// Process complete lines
174
lines.forEach(line => {
175
if (line.trim()) {
176
const processed = `[${new Date().toISOString()}] ${line}`;
177
this.push(processed);
178
}
179
});
180
181
cb(null);
182
}
183
184
_flush(cb) {
185
// Process any remaining data in buffer
186
if (this.buffer.trim()) {
187
const processed = `[${new Date().toISOString()}] ${this.buffer}`;
188
this.push(processed);
189
}
190
cb(null);
191
}
192
}
193
```
194
195
#### Data Validation and Filtering
196
197
```javascript
198
class ValidatorTransform extends Transform {
199
constructor(validator) {
200
super();
201
this.validate = validator;
202
this.validCount = 0;
203
this.invalidCount = 0;
204
}
205
206
_transform(data, cb) {
207
try {
208
const item = JSON.parse(data.toString());
209
if (this.validate(item)) {
210
this.validCount++;
211
this.push(JSON.stringify(item));
212
} else {
213
this.invalidCount++;
214
console.warn('Invalid item:', item);
215
}
216
} catch (err) {
217
this.invalidCount++;
218
console.error('Parse error:', err.message);
219
}
220
cb(null);
221
}
222
223
_flush(cb) {
224
console.log(`Processed: ${this.validCount} valid, ${this.invalidCount} invalid`);
225
cb(null);
226
}
227
}
228
229
// Usage
230
const validator = new ValidatorTransform(item =>
231
typeof item.name === 'string' && typeof item.age === 'number'
232
);
233
```
234
235
#### Compression/Encoding Transform
236
237
```javascript
238
const crypto = require('crypto');
239
240
class Base64Transform extends Transform {
241
_transform(data, cb) {
242
const encoded = Buffer.from(data).toString('base64');
243
cb(null, encoded);
244
}
245
}
246
247
class HashTransform extends Transform {
248
constructor(algorithm = 'sha256') {
249
super();
250
this.algorithm = algorithm;
251
}
252
253
_transform(data, cb) {
254
const hash = crypto.createHash(this.algorithm);
255
hash.update(data);
256
const result = hash.digest('hex');
257
cb(null, result);
258
}
259
}
260
```
261
262
### Pass-through Behavior
263
264
By default, Transform acts as a pass-through stream if no `_transform` is implemented:
265
266
```javascript
267
const passThrough = new Transform(); // Acts as pass-through
268
269
// Data written to it is emitted as readable data unchanged
270
passThrough.write("Hello");
271
passThrough.end();
272
273
passThrough.on('data', (chunk) => {
274
console.log('Passed through:', chunk.toString()); // "Hello"
275
});
276
```
277
278
### Pipeline Usage
279
280
Transform streams work excellently in pipelines:
281
282
```javascript
283
const { Readable, Writable, Transform } = require("@mafintosh/streamx");
284
285
// Create source data
286
const source = new Readable({
287
read(cb) {
288
this.push('hello world\n');
289
this.push('foo bar\n');
290
this.push(null);
291
cb(null);
292
}
293
});
294
295
// Create transforms
296
const upperCase = new Transform({
297
transform(data, cb) {
298
cb(null, data.toString().toUpperCase());
299
}
300
});
301
302
const addPrefix = new Transform({
303
transform(data, cb) {
304
cb(null, `>> ${data}`);
305
}
306
});
307
308
// Create destination
309
const destination = new Writable({
310
write(data, cb) {
311
console.log('Final:', data.toString());
312
cb(null);
313
}
314
});
315
316
// Pipeline
317
source.pipe(upperCase).pipe(addPrefix).pipe(destination, (err) => {
318
if (err) console.error('Pipeline error:', err);
319
else console.log('Pipeline completed');
320
});
321
```
322
323
### Advanced Transform Features
324
325
#### Stateful Transformation
326
327
```javascript
328
class CounterTransform extends Transform {
329
constructor() {
330
super();
331
this.lineNumber = 0;
332
}
333
334
_transform(data, cb) {
335
const lines = data.toString().split('\n');
336
const result = lines.map(line => {
337
if (line.trim()) {
338
return `${++this.lineNumber}: ${line}`;
339
}
340
return line;
341
}).join('\n');
342
cb(null, result);
343
}
344
}
345
```
346
347
#### Async Transformation
348
349
```javascript
350
class AsyncTransform extends Transform {
351
async _transform(data, cb) {
352
try {
353
// Simulate async operation
354
const result = await this.processAsync(data.toString());
355
cb(null, result);
356
} catch (err) {
357
cb(err);
358
}
359
}
360
361
async processAsync(data) {
362
return new Promise(resolve => {
363
setTimeout(() => {
364
resolve(data.toUpperCase());
365
}, 100);
366
});
367
}
368
}
369
```
370
371
### Events
372
373
Transform streams inherit all events from Duplex streams:
374
375
```javascript
376
const transform = new Transform({
377
transform(data, cb) {
378
cb(null, data.toString().toUpperCase());
379
}
380
});
381
382
transform.on('data', (chunk) => {
383
console.log('Transformed:', chunk.toString());
384
});
385
386
transform.on('finish', () => {
387
console.log('All transforms completed');
388
});
389
390
transform.on('close', () => {
391
console.log('Transform stream closed');
392
});
393
394
transform.write('hello');
395
transform.write('world');
396
transform.end();
397
```