or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

duplex-transform.mdindex.mdpipeline.mdreadable.mdwritable.md

writable.mddocs/

0

# Writable Streams

1

2

Writable streams in StreamX provide enhanced drain handling, batch writing support, and proper finish/close lifecycle management. They include integrated backpressure handling and support both individual writes and batch operations.

3

4

## Capabilities

5

6

### Writable Class

7

8

Creates a writable stream with enhanced lifecycle support and proper resource management.

9

10

```javascript { .api }

11

/**

12

* Creates a new writable stream with enhanced lifecycle support

13

* @param options - Configuration options for the writable stream

14

*/

15

class Writable extends Stream {

16

constructor(options?: WritableOptions);

17

18

/** Override this method to implement custom write logic */

19

_write(data: any, cb: (err?: Error) => void): void;

20

21

/** Override this method to implement batch write logic */

22

_writev(batch: any[], cb: (err?: Error) => void): void;

23

24

/** Lifecycle hook called before the first write operation */

25

_open(cb: (err?: Error) => void): void;

26

27

/** Cleanup hook called when the stream is destroyed */

28

_destroy(cb: (err?: Error) => void): void;

29

30

/** Hook called immediately when destroy() is first invoked */

31

_predestroy(): void;

32

33

/** Hook called just before 'finish' is emitted */

34

_final(cb: (err?: Error) => void): void;

35

36

/** Write data to the stream */

37

write(data: any): boolean;

38

39

/** End the writable stream gracefully */

40

end(): Writable;

41

42

/** Forcefully destroy the stream */

43

destroy(err?: Error): void;

44

}

45

46

interface WritableOptions {

47

/** Maximum buffer size in bytes (default: 16384) */

48

highWaterMark?: number;

49

50

/** Optional function to map input data */

51

map?: (data: any) => any;

52

53

/** Optional function to calculate byte size of data */

54

byteLength?: (data: any) => number;

55

56

/** AbortSignal that triggers destroy when aborted */

57

signal?: AbortSignal;

58

59

/** Shorthand for _write method */

60

write?: (data: any, cb: (err?: Error) => void) => void;

61

62

/** Shorthand for _writev method */

63

writev?: (batch: any[], cb: (err?: Error) => void) => void;

64

65

/** Shorthand for _final method */

66

final?: (cb: (err?: Error) => void) => void;

67

68

/** Shorthand for _open method */

69

open?: (cb: (err?: Error) => void) => void;

70

71

/** Shorthand for _destroy method */

72

destroy?: (cb: (err?: Error) => void) => void;

73

74

/** Shorthand for _predestroy method */

75

predestroy?: () => void;

76

}

77

```

78

79

**Usage Examples:**

80

81

```javascript

82

const { Writable } = require('streamx');

83

84

// Basic writable stream

85

const writable = new Writable({

86

write(data, cb) {

87

console.log('Received:', data.toString());

88

cb(); // Signal completion

89

}

90

});

91

92

// Write some data

93

writable.write('Hello, ');

94

writable.write('World!');

95

writable.end(); // End the stream

96

97

writable.on('finish', () => {

98

console.log('All writes completed');

99

});

100

101

// Writable with lifecycle hooks

102

const fileWriter = new Writable({

103

open(cb) {

104

console.log('Opening file for writing...');

105

// Open file or resource

106

cb();

107

},

108

109

write(data, cb) {

110

console.log('Writing:', data.toString());

111

// Write to file

112

cb();

113

},

114

115

final(cb) {

116

console.log('Finalizing writes...');

117

// Flush buffers, etc.

118

cb();

119

},

120

121

destroy(cb) {

122

console.log('Closing file...');

123

// Clean up resources

124

cb();

125

}

126

});

127

```

128

129

### Batch Writing

130

131

StreamX supports efficient batch writing through the `_writev` method.

132

133

```javascript { .api }

134

/**

135

* Override this method to implement batch write operations

136

* @param batch - Array of data items to write

137

* @param cb - Callback to signal completion

138

*/

139

_writev(batch: any[], cb: (err?: Error) => void): void;

140

```

141

142

**Batch Writing Example:**

143

144

```javascript

145

const batchWriter = new Writable({

146

writev(batch, cb) {

147

console.log(`Writing batch of ${batch.length} items:`);

148

batch.forEach((item, index) => {

149

console.log(` ${index}: ${item.toString()}`);

150

});

151

cb();

152

}

153

});

154

155

// Multiple writes will be batched automatically

156

batchWriter.write('item 1');

157

batchWriter.write('item 2');

158

batchWriter.write('item 3');

159

batchWriter.end();

160

```

161

162

### Static Methods

163

164

StreamX provides static utility methods for writable stream inspection and management.

165

166

```javascript { .api }

167

/**

168

* Check if a writable stream is under backpressure

169

* @param stream - The writable stream to check

170

* @returns True if the stream is backpressured

171

*/

172

static isBackpressured(stream: Writable): boolean;

173

174

/**

175

* Wait for a stream to drain the currently queued writes

176

* @param stream - The writable stream to wait for

177

* @returns Promise that resolves when drained or false if destroyed

178

*/

179

static drained(stream: Writable): Promise<boolean>;

180

```

181

182

**Static Method Examples:**

183

184

```javascript

185

const { Writable } = require('streamx');

186

187

const writable = new Writable({

188

write(data, cb) {

189

// Simulate slow writing

190

setTimeout(() => {

191

console.log('Written:', data.toString());

192

cb();

193

}, 100);

194

}

195

});

196

197

// Check backpressure

198

if (Writable.isBackpressured(writable)) {

199

console.log('Stream is backpressured, waiting...');

200

201

// Wait for drain

202

Writable.drained(writable).then((success) => {

203

if (success) {

204

console.log('Stream drained successfully');

205

} else {

206

console.log('Stream was destroyed');

207

}

208

});

209

}

210

211

// Write data

212

for (let i = 0; i < 10; i++) {

213

const canContinue = writable.write(`Message ${i}`);

214

if (!canContinue) {

215

console.log('Backpressure detected');

216

break;

217

}

218

}

219

```

220

221

### Events

222

223

Writable streams emit various events during their lifecycle.

224

225

```javascript { .api }

226

interface WritableEvents {

227

/** Emitted when the stream buffer is drained and ready for more writes */

228

'drain': () => void;

229

230

/** Emitted when all writes have been flushed after end() is called */

231

'finish': () => void;

232

233

/** Emitted when the stream has been fully closed */

234

'close': () => void;

235

236

/** Emitted when an error occurs */

237

'error': (err: Error) => void;

238

239

/** Emitted when a readable stream is piped to this writable */

240

'pipe': (src: Readable) => void;

241

}

242

```

243

244

### Properties

245

246

```javascript { .api }

247

interface WritableProperties {

248

/** Boolean indicating whether the stream has been destroyed */

249

destroyed: boolean;

250

}

251

```

252

253

### Advanced Configuration

254

255

StreamX writable streams support advanced configuration for specialized use cases.

256

257

**Backpressure Handling:**

258

259

```javascript

260

const writable = new Writable({

261

highWaterMark: 1024, // Small buffer for demonstration

262

263

write(data, cb) {

264

console.log('Writing:', data.toString());

265

// Simulate async write

266

setTimeout(cb, 10);

267

}

268

});

269

270

function writeWithBackpressure(data) {

271

const canContinue = writable.write(data);

272

273

if (!canContinue) {

274

console.log('Backpressure detected, waiting for drain...');

275

writable.once('drain', () => {

276

console.log('Stream drained, can continue writing');

277

});

278

}

279

280

return canContinue;

281

}

282

283

// Write data with backpressure handling

284

for (let i = 0; i < 100; i++) {

285

writeWithBackpressure(`Data chunk ${i}`);

286

}

287

```

288

289

**Data Transformation:**

290

291

```javascript

292

const transformWriter = new Writable({

293

map: (data) => {

294

// Transform data before writing

295

if (typeof data === 'string') {

296

return Buffer.from(data.toUpperCase());

297

}

298

return data;

299

},

300

301

write(data, cb) {

302

console.log('Transformed data:', data.toString());

303

cb();

304

}

305

});

306

307

transformWriter.write('hello world'); // Will be transformed to uppercase

308

```

309

310

**AbortSignal Integration:**

311

312

```javascript

313

const controller = new AbortController();

314

315

const writable = new Writable({

316

signal: controller.signal,

317

318

write(data, cb) {

319

// Check if aborted before processing

320

if (controller.signal.aborted) {

321

return cb(new Error('Aborted'));

322

}

323

324

console.log('Writing:', data.toString());

325

cb();

326

}

327

});

328

329

// Write some data

330

writable.write('test data');

331

332

// Abort after 1 second

333

setTimeout(() => {

334

controller.abort();

335

console.log('Write operation aborted');

336

}, 1000);

337

```

338

339

### Error Handling

340

341

StreamX provides comprehensive error handling with automatic cleanup.

342

343

```javascript

344

const errorProneWriter = new Writable({

345

write(data, cb) {

346

if (data.toString().includes('error')) {

347

// Pass error to callback

348

return cb(new Error('Write failed'));

349

}

350

351

console.log('Successfully wrote:', data.toString());

352

cb();

353

}

354

});

355

356

errorProneWriter.on('error', (err) => {

357

console.error('Stream error:', err.message);

358

});

359

360

errorProneWriter.on('close', () => {

361

console.log('Stream closed (cleanup completed)');

362

});

363

364

// This will trigger an error

365

errorProneWriter.write('This contains error');

366

```