or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

duplex.mdindex.mdreadable.mdstream.mdtransform.mdwritable.md

duplex.mddocs/

0

# Duplex Streams

1

2

Duplex streams that are both readable and writable, inheriting from Readable while implementing the complete Writable interface.

3

4

## Capabilities

5

6

### Duplex Constructor

7

8

Creates a new Duplex stream with configuration options for both readable and writable sides.

9

10

```javascript { .api }

11

/**

12

* Creates a new Duplex stream

13

* @param opts - Configuration options for both readable and writable functionality

14

*/

15

class Duplex extends Readable {

16

constructor(opts?: DuplexOptions);

17

}

18

19

interface DuplexOptions extends ReadableOptions {

20

/** Map function for readable side only */

21

mapReadable?: (data: any) => any;

22

/** ByteLength function for readable side only */

23

byteLengthReadable?: (data: any) => number;

24

/** Map function for writable side only */

25

mapWritable?: (data: any) => any;

26

/** ByteLength function for writable side only */

27

byteLengthWritable?: (data: any) => number;

28

/** Write function shorthand */

29

write?: (data: any, cb: (error?: Error) => void) => void;

30

/** Final function shorthand */

31

final?: (cb: (error?: Error) => void) => void;

32

}

33

```

34

35

**Usage Example:**

36

37

```javascript

38

const { Duplex } = require("@mafintosh/streamx");

39

40

const duplex = new Duplex({

41

read(cb) {

42

this.push(`read-${Date.now()}`);

43

cb(null);

44

},

45

write(data, cb) {

46

console.log("Writing:", data.toString());

47

cb(null);

48

}

49

});

50

```

51

52

### Readable Side

53

54

All methods and events from Readable streams are available:

55

56

#### Data Production Methods

57

58

```javascript { .api }

59

/**

60

* Called when stream wants new data for reading

61

* @param cb - Callback to call when read operation is complete

62

*/

63

_read(cb: (error?: Error) => void): void;

64

65

/**

66

* Push data to the readable buffer

67

* @param data - Data to push, or null to end readable side

68

* @returns True if buffer is not full

69

*/

70

push(data: any): boolean;

71

72

/**

73

* Read data from the readable buffer

74

* @returns Data from buffer, or null if empty

75

*/

76

read(): any;

77

```

78

79

### Writable Side

80

81

All methods and events from Writable streams are available:

82

83

#### Data Consumption Methods

84

85

```javascript { .api }

86

/**

87

* Called when stream wants to write data

88

* @param data - Data to write

89

* @param callback - Callback to call when write is complete

90

*/

91

_write(data: any, callback: (error?: Error) => void): void;

92

93

/**

94

* Write data to the stream

95

* @param data - Data to write

96

* @returns True if buffer is not full

97

*/

98

write(data: any): boolean;

99

100

/**

101

* End the writable side gracefully

102

* @param data - Optional final data to write before ending

103

*/

104

end(data?: any): void;

105

106

/**

107

* Called before finish event for cleanup

108

* @param callback - Callback to call when final is complete

109

*/

110

_final(callback: (error?: Error) => void): void;

111

```

112

113

### Combined Usage

114

115

**Echo Server Example:**

116

117

```javascript

118

const { Duplex } = require("@mafintosh/streamx");

119

120

class EchoStream extends Duplex {

121

constructor() {

122

super();

123

this.buffer = [];

124

}

125

126

_read(cb) {

127

if (this.buffer.length > 0) {

128

this.push(this.buffer.shift());

129

}

130

cb(null);

131

}

132

133

_write(data, cb) {

134

// Echo written data back to readable side

135

this.buffer.push(`Echo: ${data}`);

136

this.push(`Echo: ${data}`);

137

cb(null);

138

}

139

}

140

141

const echo = new EchoStream();

142

143

// Write data

144

echo.write("Hello");

145

echo.write("World");

146

147

// Read echoed data

148

echo.on('data', (chunk) => {

149

console.log('Received:', chunk.toString());

150

});

151

152

echo.end();

153

```

154

155

**Proxy Stream Example:**

156

157

```javascript

158

const { Duplex } = require("@mafintosh/streamx");

159

160

class ProxyStream extends Duplex {

161

constructor(target) {

162

super();

163

this.target = target;

164

165

// Forward data from target to our readable side

166

this.target.on('data', (chunk) => {

167

this.push(chunk);

168

});

169

170

this.target.on('end', () => {

171

this.push(null);

172

});

173

}

174

175

_write(data, cb) {

176

// Forward written data to target

177

this.target.write(data);

178

cb(null);

179

}

180

181

_final(cb) {

182

this.target.end();

183

cb(null);

184

}

185

}

186

```

187

188

**Bidirectional Transform Example:**

189

190

```javascript

191

const { Duplex } = require("@mafintosh/streamx");

192

193

class BidirectionalTransform extends Duplex {

194

constructor() {

195

super();

196

this.readCounter = 0;

197

this.writeCounter = 0;

198

}

199

200

_read(cb) {

201

// Generate data for reading

202

if (this.readCounter < 5) {

203

this.push(`generated-${this.readCounter++}`);

204

} else {

205

this.push(null);

206

}

207

cb(null);

208

}

209

210

_write(data, cb) {

211

// Process written data

212

const processed = data.toString().toUpperCase();

213

console.log(`Processed write #${this.writeCounter++}:`, processed);

214

cb(null);

215

}

216

}

217

218

const transform = new BidirectionalTransform();

219

220

// Read generated data

221

transform.on('data', (chunk) => {

222

console.log('Read:', chunk.toString());

223

});

224

225

// Write data to be processed

226

transform.write("hello");

227

transform.write("world");

228

transform.end();

229

```

230

231

### Events

232

233

Duplex streams emit events from both Readable and Writable:

234

235

#### Readable Events

236

- **`readable`** - Data available to read

237

- **`data`** - Data chunk read (auto-resumes stream)

238

- **`end`** - Readable side ended

239

240

#### Writable Events

241

- **`drain`** - Buffer drained, safe to write more

242

- **`finish`** - All writes completed

243

244

#### Shared Events

245

- **`close`** - Stream fully closed

246

- **`error`** - Error occurred

247

248

**Complete Event Handling Example:**

249

250

```javascript

251

const { Duplex } = require("@mafintosh/streamx");

252

253

const duplex = new Duplex({

254

read(cb) {

255

this.push(`data-${Date.now()}`);

256

setTimeout(() => cb(null), 100);

257

},

258

write(data, cb) {

259

console.log("Writing:", data.toString());

260

setTimeout(() => cb(null), 50);

261

}

262

});

263

264

// Readable events

265

duplex.on('readable', () => {

266

console.log('Readable event');

267

});

268

269

duplex.on('data', (chunk) => {

270

console.log('Data:', chunk.toString());

271

});

272

273

duplex.on('end', () => {

274

console.log('Readable end');

275

});

276

277

// Writable events

278

duplex.on('drain', () => {

279

console.log('Drain event');

280

});

281

282

duplex.on('finish', () => {

283

console.log('Writable finish');

284

});

285

286

// Shared events

287

duplex.on('close', () => {

288

console.log('Stream closed');

289

});

290

291

duplex.on('error', (err) => {

292

console.error('Error:', err);

293

});

294

295

// Use both sides

296

duplex.write("test data");

297

duplex.end();

298

```

299

300

### Advanced Configuration

301

302

**Separate Readable/Writable Configuration:**

303

304

```javascript

305

const duplex = new Duplex({

306

// Readable configuration

307

highWaterMark: 8192,

308

mapReadable: (data) => data.toString().toUpperCase(),

309

byteLengthReadable: (data) => Buffer.byteLength(data),

310

311

// Writable configuration

312

mapWritable: (data) => Buffer.from(data),

313

byteLengthWritable: (data) => data.length,

314

315

// Implementation

316

read(cb) {

317

this.push("readable data");

318

cb(null);

319

},

320

321

write(data, cb) {

322

console.log("Wrote:", data);

323

cb(null);

324

}

325

});

326

```