0
# Stream API
1
2
Core streaming transformation functionality for scalable data processing. The stream API extends Node.js `stream.Transform` to provide object transformations with full backpressure support and event-driven processing.
3
4
## Capabilities
5
6
### Transform Function
7
8
Creates a transform stream for processing records with configurable options and handler functions.
9
10
```javascript { .api }
11
/**
12
* Create a transform stream with handler function
13
* @param handler - Function to transform each record
14
* @param callback - Optional completion callback for auto-consumption
15
* @returns Transformer stream instance
16
*/
17
function transform<T, U>(handler: Handler<T, U>, callback?: Callback): Transformer;
18
19
/**
20
* Create a transform stream with options and handler
21
* @param options - Configuration options for the transformer
22
* @param handler - Function to transform each record
23
* @param callback - Optional completion callback for auto-consumption
24
* @returns Transformer stream instance
25
*/
26
function transform<T, U>(options: Options, handler: Handler<T, U>, callback?: Callback): Transformer;
27
```
28
29
**Usage Examples:**
30
31
```javascript
32
import { transform } from "stream-transform";
33
import { createReadStream } from "fs";
34
35
// Basic stream transformation
36
const transformer = transform((record) => {
37
return record.map(field => field.toUpperCase());
38
});
39
40
// Stream with options
41
const transformer = transform({
42
parallel: 50,
43
params: { prefix: "processed_" }
44
}, (record, params) => {
45
return params.prefix + record.join(",");
46
});
47
48
// Auto-consumption with callback
49
const transformer = transform((record) => record, (err, results) => {
50
if (err) throw err;
51
console.log(`Processed ${results.length} records`);
52
});
53
54
// Pipe data through transformer
55
createReadStream("input.csv")
56
.pipe(csvParser())
57
.pipe(transformer)
58
.pipe(createWriteStream("output.csv"));
59
```
60
61
### Transformer Class
62
63
Transform stream class that extends Node.js `stream.Transform` with additional state tracking and configuration.
64
65
```javascript { .api }
66
/**
67
* Transform stream class for data processing
68
*/
69
class Transformer extends stream.Transform {
70
/**
71
* Create a new Transformer instance
72
* @param options - Configuration options
73
*/
74
constructor(options: Options);
75
76
/** Configuration options (read-only) */
77
readonly options: Options;
78
79
/** Current transformation state (read-only) */
80
readonly state: State;
81
}
82
```
83
84
**Properties:**
85
86
- `options`: Immutable configuration object containing transformer settings
87
- `state`: Real-time statistics about transformation progress
88
89
**Usage Examples:**
90
91
```javascript
92
import { transform } from "stream-transform";
93
94
const transformer = transform({
95
parallel: 10,
96
consume: true,
97
params: { multiplier: 2 }
98
}, (record, params) => record * params.multiplier);
99
100
// Access configuration
101
console.log(transformer.options.parallel); // 10
102
console.log(transformer.options.params.multiplier); // 2
103
104
// Monitor progress
105
transformer.on('readable', () => {
106
console.log(`Progress: ${transformer.state.finished}/${transformer.state.started}`);
107
});
108
```
109
110
### Stream Events
111
112
Transformer streams emit standard Node.js stream events plus custom events for monitoring.
113
114
```javascript { .api }
115
// Standard stream events
116
transformer.on('readable', () => { /* Data available for reading */ });
117
transformer.on('end', () => { /* No more data will be written */ });
118
transformer.on('finish', () => { /* All data has been processed */ });
119
transformer.on('error', (err) => { /* Error occurred during processing */ });
120
transformer.on('close', () => { /* Stream has been destroyed */ });
121
122
// Pipe to destination
123
transformer.on('pipe', (src) => { /* Stream was piped from src */ });
124
transformer.on('unpipe', (src) => { /* Stream was unpiped from src */ });
125
```
126
127
**Usage Examples:**
128
129
```javascript
130
import { transform } from "stream-transform";
131
132
const transformer = transform((record) => processRecord(record));
133
134
// Handle events
135
transformer.on('readable', function() {
136
let record;
137
while ((record = this.read()) !== null) {
138
console.log('Processed:', record);
139
}
140
});
141
142
transformer.on('error', (err) => {
143
console.error('Transformation error:', err);
144
});
145
146
transformer.on('finish', () => {
147
console.log('All records processed');
148
console.log(`Final stats: ${transformer.state.finished} completed`);
149
});
150
```
151
152
### Handler Functions
153
154
User-defined transformation functions that process individual records. Supports multiple execution patterns.
155
156
```javascript { .api }
157
/**
158
* Synchronous handler - returns result directly
159
*/
160
type SyncHandler<T, U> = (record: T, params?: any) => U;
161
162
/**
163
* Asynchronous handler - uses callback for result
164
*/
165
type AsyncHandler<T, U> = (record: T, callback: HandlerCallback<U>, params?: any) => void;
166
167
/**
168
* Promise-based handler - returns Promise of result
169
*/
170
type PromiseHandler<T, U> = (record: T, params?: any) => Promise<U>;
171
172
/**
173
* Generic handler type (union of all patterns)
174
*/
175
type Handler<T, U> = SyncHandler<T, U> | AsyncHandler<T, U> | PromiseHandler<T, U>;
176
177
/**
178
* Callback function for asynchronous handlers
179
*/
180
type HandlerCallback<T = any> = (err?: null | Error, record?: T) => void;
181
```
182
183
**Handler Detection:**
184
185
The library automatically detects handler type based on function signature:
186
187
- **1 parameter** (+ optional params): Synchronous handler
188
- **2 parameters** (+ optional params): Asynchronous handler (second param is callback)
189
- **Return value with `.then` method**: Promise-based handler
190
191
**Usage Examples:**
192
193
```javascript
194
import { transform } from "stream-transform";
195
196
// Synchronous handler
197
const syncTransformer = transform((record) => {
198
return record.map(field => field.trim());
199
});
200
201
// Asynchronous handler
202
const asyncTransformer = transform((record, callback) => {
203
setTimeout(() => {
204
callback(null, record.join("|"));
205
}, 10);
206
});
207
208
// Promise-based handler
209
const promiseTransformer = transform(async (record) => {
210
const result = await processAsync(record);
211
return result;
212
});
213
214
// Handler with params
215
const paramsTransformer = transform({
216
params: { separator: "|", prefix: "row_" }
217
}, (record, params) => {
218
return params.prefix + record.join(params.separator);
219
});
220
```
221
222
### Configuration Options
223
224
Configuration object for customizing transformer behavior.
225
226
```javascript { .api }
227
interface Options extends stream.TransformOptions {
228
/**
229
* Auto-consume stream when no consumer is present
230
* @default false
231
*/
232
consume?: boolean;
233
234
/**
235
* Number of parallel transformation callbacks (async handlers only)
236
* @default 100
237
*/
238
parallel?: number;
239
240
/**
241
* User-defined parameters passed to handler function
242
* @default null
243
*/
244
params?: any;
245
}
246
```
247
248
**Usage Examples:**
249
250
```javascript
251
import { transform } from "stream-transform";
252
253
// Auto-consumption for standalone processing
254
const autoConsumer = transform({
255
consume: true
256
}, (record) => processRecord(record));
257
258
// High concurrency for I/O-bound operations
259
const highConcurrency = transform({
260
parallel: 500
261
}, async (record) => {
262
return await fetchDataForRecord(record);
263
});
264
265
// Custom parameters
266
const withParams = transform({
267
params: {
268
apiKey: process.env.API_KEY,
269
timeout: 5000
270
}
271
}, (record, params) => {
272
return enrichRecord(record, params.apiKey, params.timeout);
273
});
274
275
// Inherit stream options
276
const customStream = transform({
277
highWaterMark: 64 * 1024,
278
objectMode: true // automatically set by transform
279
}, (record) => record);
280
```
281
282
### State Monitoring
283
284
Real-time statistics about transformation progress and performance.
285
286
```javascript { .api }
287
interface State {
288
/** Number of transformations that have completed successfully */
289
finished: number;
290
291
/** Number of transformations currently being processed */
292
running: number;
293
294
/** Total number of transformations that have been started */
295
started: number;
296
297
/** Whether the stream is currently paused due to backpressure */
298
paused: boolean;
299
}
300
```
301
302
**Usage Examples:**
303
304
```javascript
305
import { transform } from "stream-transform";
306
307
const transformer = transform((record) => processRecord(record));
308
309
// Monitor progress
310
const progressInterval = setInterval(() => {
311
const { started, running, finished } = transformer.state;
312
const completion = started ? (finished / started * 100).toFixed(1) : 0;
313
console.log(`Progress: ${completion}% (${finished}/${started}, ${running} running)`);
314
}, 1000);
315
316
transformer.on('finish', () => {
317
clearInterval(progressInterval);
318
console.log(`Final: ${transformer.state.finished} records processed`);
319
});
320
321
// Detect processing bottlenecks
322
transformer.on('readable', () => {
323
if (transformer.state.running > transformer.options.parallel * 0.8) {
324
console.warn('High concurrency usage detected');
325
}
326
});
327
```
328
329
### Error Handling
330
331
Comprehensive error handling for stream processing with proper cleanup and recovery.
332
333
```javascript { .api }
334
// Errors are emitted as 'error' events
335
transformer.on('error', (err: Error) => void);
336
337
// Handler errors automatically destroy the stream
338
// Callback-based handlers: call callback(error)
339
// Sync handlers: throw error
340
// Promise handlers: reject promise
341
```
342
343
**Usage Examples:**
344
345
```javascript
346
import { transform } from "stream-transform";
347
348
const transformer = transform((record) => {
349
if (!record || record.length === 0) {
350
throw new Error('Invalid record: empty or null');
351
}
352
return processRecord(record);
353
});
354
355
// Handle stream errors
356
transformer.on('error', (err) => {
357
console.error('Transform error:', err.message);
358
// Stream is automatically destroyed
359
cleanup();
360
});
361
362
// Async handler error handling
363
const asyncTransformer = transform((record, callback) => {
364
processRecordAsync(record, (err, result) => {
365
if (err) {
366
return callback(err); // Will emit 'error' event
367
}
368
callback(null, result);
369
});
370
});
371
372
// Promise handler error handling
373
const promiseTransformer = transform(async (record) => {
374
try {
375
return await riskyOperation(record);
376
} catch (err) {
377
throw new Error(`Failed to process record: ${err.message}`);
378
}
379
});
380
```