or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

duplex.mdindex.mdreadable.mdstream.mdtransform.mdwritable.md

readable.mddocs/

0

# Readable Streams

1

2

Readable stream implementation providing data flow control, buffer management, and pipe operations with enhanced error handling.

3

4

## Capabilities

5

6

### Readable Constructor

7

8

Creates a new Readable stream with configuration options.

9

10

```javascript { .api }

11

/**

12

* Creates a new Readable stream

13

* @param opts - Configuration options for the readable stream

14

*/

15

class Readable extends Stream {

16

constructor(opts?: ReadableOptions);

17

}

18

19

interface ReadableOptions extends StreamOptions {

20

/** Maximum buffer size in bytes (default: 16384) */

21

highWaterMark?: number;

22

/** Function to map input data */

23

map?: (data: any) => any;

24

/** Function to calculate byte size of data */

25

byteLength?: (data: any) => number;

26

/** Read function shorthand */

27

read?: (cb: (error?: Error) => void) => void;

28

}

29

```

30

31

**Usage Example:**

32

33

```javascript

34

const { Readable } = require("@mafintosh/streamx");

35

36

const readable = new Readable({

37

highWaterMark: 8192,

38

read(cb) {

39

this.push("Hello World");

40

cb(null);

41

}

42

});

43

```

44

45

### Data Production

46

47

#### _read Method

48

49

Called when the stream wants new data. Override to implement data reading logic.

50

51

```javascript { .api }

52

/**

53

* Called when stream wants new data

54

* @param cb - Callback to call when read operation is complete

55

*/

56

_read(cb: (error?: Error) => void): void;

57

```

58

59

**Usage Example:**

60

61

```javascript

62

class CustomReadable extends Readable {

63

constructor() {

64

super();

65

this.counter = 0;

66

}

67

68

_read(cb) {

69

if (this.counter < 5) {

70

this.push(`data-${this.counter++}`);

71

} else {

72

this.push(null); // End stream

73

}

74

cb(null);

75

}

76

}

77

```

78

79

#### push Method

80

81

Pushes data to the stream buffer.

82

83

```javascript { .api }

84

/**

85

* Push data to stream buffer

86

* @param data - Data to push, or null to end stream

87

* @returns True if buffer is not full and more data can be pushed

88

*/

89

push(data: any): boolean;

90

```

91

92

**Usage Example:**

93

94

```javascript

95

const readable = new Readable({

96

read(cb) {

97

const shouldContinue = this.push("some data");

98

if (shouldContinue) {

99

this.push("more data");

100

}

101

this.push(null); // End stream

102

cb(null);

103

}

104

});

105

```

106

107

### Data Consumption

108

109

#### read Method

110

111

Reads data from the stream buffer.

112

113

```javascript { .api }

114

/**

115

* Read data from stream buffer

116

* @returns Data from buffer, or null if buffer is empty or stream ended

117

*/

118

read(): any;

119

```

120

121

**Usage Example:**

122

123

```javascript

124

const readable = new Readable({

125

read(cb) {

126

this.push("Hello");

127

this.push("World");

128

this.push(null);

129

cb(null);

130

}

131

});

132

133

readable.on('readable', () => {

134

let chunk;

135

while ((chunk = readable.read()) !== null) {

136

console.log('Read:', chunk);

137

}

138

});

139

```

140

141

#### unshift Method

142

143

Adds data to the front of the buffer (useful for putting back over-read data).

144

145

```javascript { .api }

146

/**

147

* Add data to front of buffer

148

* @param data - Data to add to front of buffer

149

*/

150

unshift(data: any): void;

151

```

152

153

**Usage Example:**

154

155

```javascript

156

const readable = new Readable({

157

read(cb) {

158

this.push("Hello World");

159

this.push(null);

160

cb(null);

161

}

162

});

163

164

readable.on('readable', () => {

165

const data = readable.read();

166

if (data === "Hello World") {

167

// Put it back for later processing

168

readable.unshift(data);

169

}

170

});

171

```

172

173

### Flow Control

174

175

#### pause Method

176

177

Pauses the stream (only needed if stream is resumed).

178

179

```javascript { .api }

180

/**

181

* Pause the stream

182

*/

183

pause(): void;

184

```

185

186

#### resume Method

187

188

Resumes/starts consuming the stream as fast as possible.

189

190

```javascript { .api }

191

/**

192

* Resume consuming the stream

193

*/

194

resume(): void;

195

```

196

197

**Usage Example:**

198

199

```javascript

200

const readable = new Readable({

201

read(cb) {

202

this.push(`data-${Date.now()}`);

203

cb(null);

204

}

205

});

206

207

// Start consuming

208

readable.resume();

209

210

// Pause after 1 second

211

setTimeout(() => {

212

readable.pause();

213

}, 1000);

214

215

// Resume after another second

216

setTimeout(() => {

217

readable.resume();

218

}, 2000);

219

```

220

221

### Pipe Operations

222

223

#### pipe Method

224

225

Efficiently pipes the readable stream to a writable stream with error handling.

226

227

```javascript { .api }

228

/**

229

* Pipe readable stream to writable stream

230

* @param destination - Writable stream to pipe to

231

* @param callback - Optional callback called when pipeline completes

232

* @returns The destination stream

233

*/

234

pipe(destination: Writable, callback?: (error?: Error) => void): Writable;

235

```

236

237

**Usage Example:**

238

239

```javascript

240

const { Readable, Writable } = require("@mafintosh/streamx");

241

242

const readable = new Readable({

243

read(cb) {

244

this.push("Hello World");

245

this.push(null);

246

cb(null);

247

}

248

});

249

250

const writable = new Writable({

251

write(data, cb) {

252

console.log("Received:", data.toString());

253

cb(null);

254

}

255

});

256

257

readable.pipe(writable, (err) => {

258

if (err) console.error("Pipeline failed:", err);

259

else console.log("Pipeline completed successfully");

260

});

261

```

262

263

### Events

264

265

#### readable Event

266

267

Emitted when data is available in the buffer and buffer was previously empty.

268

269

```javascript

270

readable.on('readable', () => {

271

// Data is available to read

272

});

273

```

274

275

#### data Event

276

277

Emitted when data is being read from the stream. Attaching this event automatically resumes the stream.

278

279

```javascript

280

readable.on('data', (chunk) => {

281

console.log('Received chunk:', chunk);

282

});

283

```

284

285

#### end Event

286

287

Emitted when the stream has ended and no more data is available.

288

289

```javascript

290

readable.on('end', () => {

291

console.log('Stream ended');

292

});

293

```

294

295

#### close Event

296

297

Emitted when the stream has fully closed.

298

299

```javascript

300

readable.on('close', () => {

301

console.log('Stream closed');

302

});

303

```

304

305

#### error Event

306

307

Emitted when an error occurs.

308

309

```javascript

310

readable.on('error', (err) => {

311

console.error('Stream error:', err);

312

});

313

```

314

315

**Complete Usage Example:**

316

317

```javascript

318

const { Readable } = require("@mafintosh/streamx");

319

320

const readable = new Readable({

321

read(cb) {

322

// Simulate reading data

323

setTimeout(() => {

324

if (Math.random() > 0.8) {

325

this.push(null); // End stream

326

} else {

327

this.push(`data-${Date.now()}`);

328

}

329

cb(null);

330

}, 100);

331

}

332

});

333

334

readable.on('data', (chunk) => {

335

console.log('Data:', chunk);

336

});

337

338

readable.on('end', () => {

339

console.log('Stream ended');

340

});

341

342

readable.on('close', () => {

343

console.log('Stream closed');

344

});

345

346

readable.on('error', (err) => {

347

console.error('Error:', err);

348

});

349

```