or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

duplex.mdindex.mdreadable.mdstream.mdtransform.mdwritable.md

writable.mddocs/

0

# Writable Streams

1

2

Writable stream implementation providing data consumption, buffering, and backpressure management with proper lifecycle handling.

3

4

## Capabilities

5

6

### Writable Constructor

7

8

Creates a new Writable stream with configuration options.

9

10

```javascript { .api }

11

/**

12

* Creates a new Writable stream

13

* @param opts - Configuration options for the writable stream

14

*/

15

class Writable extends Stream {

16

constructor(opts?: WritableOptions);

17

}

18

19

interface WritableOptions extends StreamOptions {

20

/** Maximum buffer size in bytes (default: 16384) */

21

highWaterMark?: number;

22

/** Function to map input data */

23

map?: (data: any) => any;

24

/** Function to calculate byte size of data */

25

byteLength?: (data: any) => number;

26

/** Write function shorthand */

27

write?: (data: any, cb: (error?: Error) => void) => void;

28

/** Final function shorthand */

29

final?: (cb: (error?: Error) => void) => void;

30

}

31

```

32

33

**Usage Example:**

34

35

```javascript

36

const { Writable } = require("@mafintosh/streamx");

37

38

const writable = new Writable({

39

write(data, cb) {

40

console.log("Writing:", data.toString());

41

cb(null);

42

}

43

});

44

```

45

46

### Data Writing

47

48

#### _write Method

49

50

Called when the stream wants to write data. Override to implement write logic.

51

52

```javascript { .api }

53

/**

54

* Called when stream wants to write data

55

* @param data - Data to write

56

* @param callback - Callback to call when write operation is complete

57

*/

58

_write(data: any, callback: (error?: Error) => void): void;

59

```

60

61

**Usage Example:**

62

63

```javascript

64

const fs = require('fs');

65

66

class FileWritable extends Writable {

67

constructor(filename) {

68

super();

69

this.filename = filename;

70

this.fd = null;

71

}

72

73

_open(cb) {

74

fs.open(this.filename, 'w', (err, fd) => {

75

if (err) return cb(err);

76

this.fd = fd;

77

cb(null);

78

});

79

}

80

81

_write(data, cb) {

82

fs.write(this.fd, data, (err) => {

83

if (err) return cb(err);

84

cb(null);

85

});

86

}

87

88

_destroy(cb) {

89

if (this.fd) {

90

fs.close(this.fd, cb);

91

} else {

92

cb(null);

93

}

94

}

95

}

96

```

97

98

#### write Method

99

100

Writes data to the stream.

101

102

```javascript { .api }

103

/**

104

* Write data to the stream

105

* @param data - Data to write

106

* @returns True if buffer is not full, false if backpressure should be applied

107

*/

108

write(data: any): boolean;

109

```

110

111

**Usage Example:**

112

113

```javascript

114

const writable = new Writable({

115

write(data, cb) {

116

console.log("Received:", data.toString());

117

cb(null);

118

}

119

});

120

121

const canContinue = writable.write("Hello World");

122

if (!canContinue) {

123

// Wait for 'drain' event before writing more

124

writable.once('drain', () => {

125

writable.write("More data");

126

});

127

}

128

```

129

130

### Stream Finalization

131

132

#### end Method

133

134

Ends the writable stream gracefully.

135

136

```javascript { .api }

137

/**

138

* End the writable stream gracefully

139

* @param data - Optional final data to write before ending

140

*/

141

end(data?: any): void;

142

```

143

144

**Usage Example:**

145

146

```javascript

147

const writable = new Writable({

148

write(data, cb) {

149

console.log("Writing:", data.toString());

150

cb(null);

151

}

152

});

153

154

writable.write("First chunk");

155

writable.write("Second chunk");

156

writable.end("Final chunk"); // Write final data and end

157

158

// Or end without final data

159

// writable.end();

160

161

writable.on('finish', () => {

162

console.log('All writes completed');

163

});

164

```

165

166

#### _final Method

167

168

Called just before the 'finish' event when all writes have been processed.

169

170

```javascript { .api }

171

/**

172

* Called before finish event for final cleanup

173

* @param callback - Callback to call when final is complete

174

*/

175

_final(callback: (error?: Error) => void): void;

176

```

177

178

**Usage Example:**

179

180

```javascript

181

class BufferedWritable extends Writable {

182

constructor() {

183

super();

184

this.buffer = [];

185

}

186

187

_write(data, cb) {

188

this.buffer.push(data);

189

cb(null);

190

}

191

192

_final(cb) {

193

// Final processing of remaining buffer

194

console.log("Final processing buffer:", this.buffer);

195

this.buffer = [];

196

cb(null);

197

}

198

}

199

```

200

201

### Events

202

203

#### drain Event

204

205

Emitted when the buffer has been drained after being full.

206

207

```javascript

208

writable.on('drain', () => {

209

// Buffer is no longer full, safe to write more

210

});

211

```

212

213

#### finish Event

214

215

Emitted when the stream has been ended and all writes have been processed.

216

217

```javascript

218

writable.on('finish', () => {

219

console.log('All writes completed');

220

});

221

```

222

223

#### close Event

224

225

Emitted when the stream has fully closed.

226

227

```javascript

228

writable.on('close', () => {

229

console.log('Stream closed');

230

});

231

```

232

233

#### error Event

234

235

Emitted when an error occurs.

236

237

```javascript

238

writable.on('error', (err) => {

239

console.error('Stream error:', err);

240

});

241

```

242

243

### Backpressure Handling

244

245

**Complete Usage Example:**

246

247

```javascript

248

const { Writable } = require("@mafintosh/streamx");

249

250

const writable = new Writable({

251

highWaterMark: 1024, // 1KB buffer

252

write(data, cb) {

253

// Simulate slow write operation

254

setTimeout(() => {

255

console.log("Wrote:", data.toString());

256

cb(null);

257

}, 100);

258

},

259

final(cb) {

260

console.log("Final processing of any remaining data");

261

cb(null);

262

}

263

});

264

265

function writeData(data) {

266

const canContinue = writable.write(data);

267

if (!canContinue) {

268

console.log("Buffer full, waiting for drain...");

269

writable.once('drain', () => {

270

console.log("Buffer drained, can continue writing");

271

});

272

}

273

}

274

275

// Write some data

276

writeData("First chunk");

277

writeData("Second chunk");

278

writeData("Third chunk");

279

280

// End the stream

281

writable.end();

282

283

writable.on('finish', () => {

284

console.log('All writes completed');

285

});

286

287

writable.on('close', () => {

288

console.log('Stream closed');

289

});

290

291

writable.on('error', (err) => {

292

console.error('Error:', err);

293

});

294

```

295

296

### Integration with File System

297

298

```javascript

299

const fs = require('fs');

300

const { Writable } = require("@mafintosh/streamx");

301

302

class FileWriter extends Writable {

303

constructor(filename) {

304

super();

305

this.filename = filename;

306

this.fd = null;

307

}

308

309

_open(cb) {

310

fs.open(this.filename, 'w', (err, fd) => {

311

if (err) return cb(err);

312

this.fd = fd;

313

cb(null);

314

});

315

}

316

317

_write(data, cb) {

318

if (!Buffer.isBuffer(data)) {

319

data = Buffer.from(data);

320

}

321

fs.write(this.fd, data, 0, data.length, null, (err) => {

322

if (err) return cb(err);

323

cb(null);

324

});

325

}

326

327

_destroy(cb) {

328

if (this.fd) {

329

fs.close(this.fd, cb);

330

} else {

331

cb(null);

332

}

333

}

334

}

335

336

// Usage

337

const writer = new FileWriter('output.txt');

338

writer.write('Hello ');

339

writer.write('World!');

340

writer.end();

341

```