0
# Writable Streams
1
2
Writable streams in StreamX provide enhanced drain handling, batch writing support, and proper finish/close lifecycle management. They include integrated backpressure handling and support both individual writes and batch operations.
3
4
## Capabilities
5
6
### Writable Class
7
8
Creates a writable stream with enhanced lifecycle support and proper resource management.
9
10
```javascript { .api }
11
/**
12
* Creates a new writable stream with enhanced lifecycle support
13
* @param options - Configuration options for the writable stream
14
*/
15
class Writable extends Stream {
16
constructor(options?: WritableOptions);
17
18
/** Override this method to implement custom write logic */
19
_write(data: any, cb: (err?: Error) => void): void;
20
21
/** Override this method to implement batch write logic */
22
_writev(batch: any[], cb: (err?: Error) => void): void;
23
24
/** Lifecycle hook called before the first write operation */
25
_open(cb: (err?: Error) => void): void;
26
27
/** Cleanup hook called when the stream is destroyed */
28
_destroy(cb: (err?: Error) => void): void;
29
30
/** Hook called immediately when destroy() is first invoked */
31
_predestroy(): void;
32
33
/** Hook called just before 'finish' is emitted */
34
_final(cb: (err?: Error) => void): void;
35
36
/** Write data to the stream */
37
write(data: any): boolean;
38
39
/** End the writable stream gracefully */
40
end(): Writable;
41
42
/** Forcefully destroy the stream */
43
destroy(err?: Error): void;
44
}
45
46
interface WritableOptions {
47
/** Maximum buffer size in bytes (default: 16384) */
48
highWaterMark?: number;
49
50
/** Optional function to map input data */
51
map?: (data: any) => any;
52
53
/** Optional function to calculate byte size of data */
54
byteLength?: (data: any) => number;
55
56
/** AbortSignal that triggers destroy when aborted */
57
signal?: AbortSignal;
58
59
/** Shorthand for _write method */
60
write?: (data: any, cb: (err?: Error) => void) => void;
61
62
/** Shorthand for _writev method */
63
writev?: (batch: any[], cb: (err?: Error) => void) => void;
64
65
/** Shorthand for _final method */
66
final?: (cb: (err?: Error) => void) => void;
67
68
/** Shorthand for _open method */
69
open?: (cb: (err?: Error) => void) => void;
70
71
/** Shorthand for _destroy method */
72
destroy?: (cb: (err?: Error) => void) => void;
73
74
/** Shorthand for _predestroy method */
75
predestroy?: () => void;
76
}
77
```
78
79
**Usage Examples:**
80
81
```javascript
82
const { Writable } = require('streamx');
83
84
// Basic writable stream
85
const writable = new Writable({
86
write(data, cb) {
87
console.log('Received:', data.toString());
88
cb(); // Signal completion
89
}
90
});
91
92
// Write some data
93
writable.write('Hello, ');
94
writable.write('World!');
95
writable.end(); // End the stream
96
97
writable.on('finish', () => {
98
console.log('All writes completed');
99
});
100
101
// Writable with lifecycle hooks
102
const fileWriter = new Writable({
103
open(cb) {
104
console.log('Opening file for writing...');
105
// Open file or resource
106
cb();
107
},
108
109
write(data, cb) {
110
console.log('Writing:', data.toString());
111
// Write to file
112
cb();
113
},
114
115
final(cb) {
116
console.log('Finalizing writes...');
117
// Flush buffers, etc.
118
cb();
119
},
120
121
destroy(cb) {
122
console.log('Closing file...');
123
// Clean up resources
124
cb();
125
}
126
});
127
```
128
129
### Batch Writing
130
131
StreamX supports efficient batch writing through the `_writev` method.
132
133
```javascript { .api }
134
/**
135
* Override this method to implement batch write operations
136
* @param batch - Array of data items to write
137
* @param cb - Callback to signal completion
138
*/
139
_writev(batch: any[], cb: (err?: Error) => void): void;
140
```
141
142
**Batch Writing Example:**
143
144
```javascript
145
const batchWriter = new Writable({
146
writev(batch, cb) {
147
console.log(`Writing batch of ${batch.length} items:`);
148
batch.forEach((item, index) => {
149
console.log(` ${index}: ${item.toString()}`);
150
});
151
cb();
152
}
153
});
154
155
// Multiple writes will be batched automatically
156
batchWriter.write('item 1');
157
batchWriter.write('item 2');
158
batchWriter.write('item 3');
159
batchWriter.end();
160
```
161
162
### Static Methods
163
164
StreamX provides static utility methods for writable stream inspection and management.
165
166
```javascript { .api }
167
/**
168
* Check if a writable stream is under backpressure
169
* @param stream - The writable stream to check
170
* @returns True if the stream is backpressured
171
*/
172
static isBackpressured(stream: Writable): boolean;
173
174
/**
175
* Wait for a stream to drain the currently queued writes
176
* @param stream - The writable stream to wait for
177
* @returns Promise that resolves when drained or false if destroyed
178
*/
179
static drained(stream: Writable): Promise<boolean>;
180
```
181
182
**Static Method Examples:**
183
184
```javascript
185
const { Writable } = require('streamx');
186
187
const writable = new Writable({
188
write(data, cb) {
189
// Simulate slow writing
190
setTimeout(() => {
191
console.log('Written:', data.toString());
192
cb();
193
}, 100);
194
}
195
});
196
197
// Check backpressure
198
if (Writable.isBackpressured(writable)) {
199
console.log('Stream is backpressured, waiting...');
200
201
// Wait for drain
202
Writable.drained(writable).then((success) => {
203
if (success) {
204
console.log('Stream drained successfully');
205
} else {
206
console.log('Stream was destroyed');
207
}
208
});
209
}
210
211
// Write data
212
for (let i = 0; i < 10; i++) {
213
const canContinue = writable.write(`Message ${i}`);
214
if (!canContinue) {
215
console.log('Backpressure detected');
216
break;
217
}
218
}
219
```
220
221
### Events
222
223
Writable streams emit various events during their lifecycle.
224
225
```javascript { .api }
226
interface WritableEvents {
227
/** Emitted when the stream buffer is drained and ready for more writes */
228
'drain': () => void;
229
230
/** Emitted when all writes have been flushed after end() is called */
231
'finish': () => void;
232
233
/** Emitted when the stream has been fully closed */
234
'close': () => void;
235
236
/** Emitted when an error occurs */
237
'error': (err: Error) => void;
238
239
/** Emitted when a readable stream is piped to this writable */
240
'pipe': (src: Readable) => void;
241
}
242
```
243
244
### Properties
245
246
```javascript { .api }
247
interface WritableProperties {
248
/** Boolean indicating whether the stream has been destroyed */
249
destroyed: boolean;
250
}
251
```
252
253
### Advanced Configuration
254
255
StreamX writable streams support advanced configuration for specialized use cases.
256
257
**Backpressure Handling:**
258
259
```javascript
260
const writable = new Writable({
261
highWaterMark: 1024, // Small buffer for demonstration
262
263
write(data, cb) {
264
console.log('Writing:', data.toString());
265
// Simulate async write
266
setTimeout(cb, 10);
267
}
268
});
269
270
function writeWithBackpressure(data) {
271
const canContinue = writable.write(data);
272
273
if (!canContinue) {
274
console.log('Backpressure detected, waiting for drain...');
275
writable.once('drain', () => {
276
console.log('Stream drained, can continue writing');
277
});
278
}
279
280
return canContinue;
281
}
282
283
// Write data with backpressure handling
284
for (let i = 0; i < 100; i++) {
285
writeWithBackpressure(`Data chunk ${i}`);
286
}
287
```
288
289
**Data Transformation:**
290
291
```javascript
292
const transformWriter = new Writable({
293
map: (data) => {
294
// Transform data before writing
295
if (typeof data === 'string') {
296
return Buffer.from(data.toUpperCase());
297
}
298
return data;
299
},
300
301
write(data, cb) {
302
console.log('Transformed data:', data.toString());
303
cb();
304
}
305
});
306
307
transformWriter.write('hello world'); // Will be transformed to uppercase
308
```
309
310
**AbortSignal Integration:**
311
312
```javascript
313
const controller = new AbortController();
314
315
const writable = new Writable({
316
signal: controller.signal,
317
318
write(data, cb) {
319
// Check if aborted before processing
320
if (controller.signal.aborted) {
321
return cb(new Error('Aborted'));
322
}
323
324
console.log('Writing:', data.toString());
325
cb();
326
}
327
});
328
329
// Write some data
330
writable.write('test data');
331
332
// Abort after 1 second
333
setTimeout(() => {
334
controller.abort();
335
console.log('Write operation aborted');
336
}, 1000);
337
```
338
339
### Error Handling
340
341
StreamX provides comprehensive error handling with automatic cleanup.
342
343
```javascript
344
const errorProneWriter = new Writable({
345
write(data, cb) {
346
if (data.toString().includes('error')) {
347
// Pass error to callback
348
return cb(new Error('Write failed'));
349
}
350
351
console.log('Successfully wrote:', data.toString());
352
cb();
353
}
354
});
355
356
errorProneWriter.on('error', (err) => {
357
console.error('Stream error:', err.message);
358
});
359
360
errorProneWriter.on('close', () => {
361
console.log('Stream closed (cleanup completed)');
362
});
363
364
// This will trigger an error
365
errorProneWriter.write('This contains error');
366
```