A tiny wrapper around Node.js streams.Transform to avoid explicit subclassing noise
npx @tessl/cli install tessl/npm-through2@4.0.00
# through2
1
2
through2 is a tiny wrapper around Node.js streams.Transform that eliminates the complexity of explicit subclassing and prototype chain setup. It provides a functional approach to creating transform streams through simple function parameters, supporting both binary and object modes with optional transform and flush functions.
3
4
## Package Information
5
6
- **Package Name**: through2
7
- **Package Type**: npm
8
- **Language**: JavaScript
9
- **Installation**: `npm install through2`
10
11
## Core Imports
12
13
```javascript
14
const through2 = require('through2');
15
```
16
17
For ESM environments:
18
19
```javascript
20
import through2 from 'through2';
21
```
22
23
## Basic Usage
24
25
```javascript
26
const fs = require('fs');
27
const through2 = require('through2');
28
29
// Transform text data
30
fs.createReadStream('input.txt')
31
.pipe(through2(function (chunk, enc, callback) {
32
// Transform chunk (replace 'a' with 'z')
33
for (let i = 0; i < chunk.length; i++) {
34
if (chunk[i] == 97) chunk[i] = 122;
35
}
36
this.push(chunk);
37
callback();
38
}))
39
.pipe(fs.createWriteStream('output.txt'));
40
41
// Object mode transformation
42
const objectStream = through2.obj(function (obj, enc, callback) {
43
obj.processed = true;
44
obj.timestamp = Date.now();
45
this.push(obj);
46
callback();
47
});
48
```
49
50
## Capabilities
51
52
### Transform Stream Creation
53
54
Creates a transform stream instance with optional transform and flush functions.
55
56
```javascript { .api }
57
/**
58
* Creates a transform stream with optional transformation logic
59
* @param {Object} options - Stream options (optional)
60
* @param {Function} transform - Transform function (optional)
61
* @param {Function} flush - Flush function (optional)
62
* @returns {Transform} Transform stream instance
63
*/
64
function through2(options, transform, flush);
65
66
/**
67
* Transform function signature
68
* @param {Buffer|String|any} chunk - Data chunk to transform
69
* @param {String} encoding - Encoding (for string chunks)
70
* @param {Function} callback - Completion callback
71
*/
72
function transform(chunk, encoding, callback);
73
74
/**
75
* Flush function signature
76
* @param {Function} callback - Completion callback
77
*/
78
function flush(callback);
79
```
80
81
**Usage Examples:**
82
83
```javascript
84
// Basic transform with all parameters
85
const transform = through2(
86
{ objectMode: false },
87
function (chunk, enc, callback) {
88
// Process chunk
89
this.push(chunk.toString().toUpperCase());
90
callback();
91
},
92
function (callback) {
93
// Final processing
94
this.push('\n-- END --\n');
95
callback();
96
}
97
);
98
99
// Transform function only
100
const simpleTransform = through2(function (chunk, enc, callback) {
101
callback(null, chunk); // Pass through unchanged
102
});
103
104
// No-op transform (pass-through)
105
const passThrough = through2();
106
```
107
108
### Object Mode Transform Stream
109
110
Creates transform streams optimized for object processing with `objectMode: true` and `highWaterMark: 16`.
111
112
```javascript { .api }
113
/**
114
* Creates an object mode transform stream
115
* @param {Object} options - Stream options (optional)
116
* @param {Function} transform - Transform function (optional)
117
* @param {Function} flush - Flush function (optional)
118
* @returns {Transform} Object mode transform stream
119
*/
120
function through2.obj(options, transform, flush);
121
```
122
123
**Usage Examples:**
124
125
```javascript
126
// Object transformation
127
const processor = through2.obj(function (obj, enc, callback) {
128
if (obj.type === 'user') {
129
obj.processedAt = new Date().toISOString();
130
this.push(obj);
131
}
132
callback();
133
});
134
135
// With options override
136
const customProcessor = through2.obj(
137
{ highWaterMark: 32 },
138
function (obj, enc, callback) {
139
this.push({ ...obj, id: Math.random() });
140
callback();
141
}
142
);
143
```
144
145
### Constructor Function Creation
146
147
Creates reusable constructor functions for transform streams, useful when the same transform logic needs to be used in multiple instances.
148
149
```javascript { .api }
150
/**
151
* Creates a reusable transform stream constructor
152
* @param {Object} options - Default stream options (optional)
153
* @param {Function} transform - Transform function
154
* @param {Function} flush - Flush function (optional)
155
* @returns {Function} Constructor function
156
*/
157
function through2.ctor(options, transform, flush);
158
159
/**
160
* Constructor function signature
161
* @param {Object} override - Options to override defaults (optional)
162
* @returns {Transform} Transform stream instance
163
*/
164
function Through2Constructor(override);
165
```
166
167
**Usage Examples:**
168
169
```javascript
170
// Create reusable constructor
171
const TextProcessor = through2.ctor(
172
{ encoding: 'utf8' },
173
function (chunk, enc, callback) {
174
this.push(chunk.toString().toLowerCase());
175
callback();
176
}
177
);
178
179
// Create instances
180
const processor1 = new TextProcessor();
181
const processor2 = TextProcessor(); // 'new' is optional
182
const processor3 = TextProcessor({ objectMode: true }); // Override options
183
184
// Object mode constructor
185
const ObjectProcessor = through2.ctor(
186
{ objectMode: true },
187
function (record, enc, callback) {
188
if (record.temp != null && record.unit === 'F') {
189
record.temp = ((record.temp - 32) * 5) / 9;
190
record.unit = 'C';
191
}
192
this.push(record);
193
callback();
194
}
195
);
196
```
197
198
## Transform Function Details
199
200
Transform functions receive three parameters and must call the callback to signal completion:
201
202
```javascript { .api }
203
/**
204
* Transform function implementation pattern
205
* @param {Buffer|String|any} chunk - Input data chunk
206
* @param {String} encoding - Character encoding for string chunks
207
* @param {Function} callback - Completion callback
208
*/
209
function transformFunction(chunk, encoding, callback) {
210
// Process the chunk
211
// Option 1: Push data and call callback
212
this.push(processedData);
213
callback();
214
215
// Option 2: Use callback shorthand
216
callback(null, processedData);
217
218
// Option 3: Signal error
219
callback(new Error('Processing failed'));
220
221
// Option 4: Drop chunk (don't push anything)
222
callback();
223
}
224
```
225
226
## Flush Function Details
227
228
Flush functions are called when the stream is ending and no more data will be written:
229
230
```javascript { .api }
231
/**
232
* Flush function implementation pattern
233
* @param {Function} callback - Completion callback
234
*/
235
function flushFunction(callback) {
236
// Perform final processing
237
// Push any remaining data
238
this.push(finalData);
239
callback();
240
241
// Or signal completion without data
242
callback();
243
244
// Or signal error
245
callback(new Error('Flush failed'));
246
}
247
```
248
249
## Stream Options
250
251
All functions accept standard Node.js Transform stream options:
252
253
```javascript { .api }
254
interface StreamOptions {
255
/** Enable object mode for non-binary data */
256
objectMode?: boolean;
257
/** Internal buffer size */
258
highWaterMark?: number;
259
/** Keep readable side open when writable ends */
260
allowHalfOpen?: boolean;
261
/** Decode strings to Buffers before passing to _transform */
262
decodeStrings?: boolean;
263
/** Default string encoding */
264
encoding?: string;
265
/** Destroy stream if writable side ends */
266
autoDestroy?: boolean;
267
/** Emit 'close' after 'finish' and 'end' */
268
emitClose?: boolean;
269
}
270
```
271
272
## Error Handling and Stream Control
273
274
```javascript
275
// Error handling in transform function
276
const errorTransform = through2(function (chunk, enc, callback) {
277
try {
278
const result = riskyOperation(chunk);
279
callback(null, result);
280
} catch (error) {
281
callback(error); // Stream will emit 'error' event
282
}
283
});
284
285
// Stream destruction
286
const stream = through2();
287
stream.destroy(); // Destroys stream and emits 'close'
288
289
// Stream can be destroyed multiple times safely
290
stream.destroy();
291
stream.destroy(); // No additional effects
292
```
293
294
## Advanced Usage Patterns
295
296
```javascript
297
// Stateful transformation
298
const counter = through2(function (chunk, enc, callback) {
299
if (!this._count) this._count = 0;
300
this._count++;
301
302
this.push(`${this._count}: ${chunk}`);
303
callback();
304
});
305
306
// Conditional processing
307
const filter = through2.obj(function (obj, enc, callback) {
308
if (obj.include) {
309
this.push(obj);
310
}
311
// Skip objects without 'include' property
312
callback();
313
});
314
315
// Batch processing with flush
316
const batcher = through2.obj(
317
function (obj, enc, callback) {
318
if (!this._batch) this._batch = [];
319
this._batch.push(obj);
320
321
if (this._batch.length >= 10) {
322
this.push(this._batch);
323
this._batch = [];
324
}
325
callback();
326
},
327
function (callback) {
328
// Push remaining items
329
if (this._batch && this._batch.length > 0) {
330
this.push(this._batch);
331
}
332
callback();
333
}
334
);
335
```