or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

duplex-transform.mdindex.mdpipeline.mdreadable.mdwritable.md

readable.mddocs/

0

# Readable Streams

1

2

Readable streams in StreamX provide enhanced lifecycle management, proper backpressure handling, and improved error handling compared to Node.js core streams. They support both flowing and non-flowing modes with consistent behavior.

3

4

## Capabilities

5

6

### Readable Class

7

8

Creates a readable stream with enhanced lifecycle support and proper resource management.

9

10

```javascript { .api }

11

/**

12

* Creates a new readable stream with enhanced lifecycle support

13

* @param options - Configuration options for the readable stream

14

*/

15

class Readable extends Stream {

16

constructor(options?: ReadableOptions);

17

18

/** Override this method to implement custom read logic */

19

_read(cb: () => void): void;

20

21

/** Lifecycle hook called before the first read operation */

22

_open(cb: (err?: Error) => void): void;

23

24

/** Cleanup hook called when the stream is destroyed */

25

_destroy(cb: (err?: Error) => void): void;

26

27

/** Hook called immediately when destroy() is first invoked */

28

_predestroy(): void;

29

30

/** Push data to the stream buffer */

31

push(data: any): boolean;

32

33

/** Read data from the stream buffer */

34

read(): any;

35

36

/** Add data to the front of the buffer */

37

unshift(data: any): void;

38

39

/** Pause the stream */

40

pause(): Readable;

41

42

/** Resume the stream */

43

resume(): Readable;

44

45

/** Pipe to a writable stream with callback support */

46

pipe(destination: Writable, callback?: (err?: Error) => void): Writable;

47

48

/** Set text encoding for automatic string decoding */

49

setEncoding(encoding: string): Readable;

50

51

/** Forcefully destroy the stream */

52

destroy(err?: Error): void;

53

}

54

55

interface ReadableOptions {

56

/** Maximum buffer size in bytes (default: 16384) */

57

highWaterMark?: number;

58

59

/** Optional function to map input data */

60

map?: (data: any) => any;

61

62

/** Optional function to calculate byte size of data */

63

byteLength?: (data: any) => number;

64

65

/** AbortSignal that triggers destroy when aborted */

66

signal?: AbortSignal;

67

68

/** Eagerly open the stream (default: false) */

69

eagerOpen?: boolean;

70

71

/** Shorthand for _read method */

72

read?: (cb: () => void) => void;

73

74

/** Shorthand for _open method */

75

open?: (cb: (err?: Error) => void) => void;

76

77

/** Shorthand for _destroy method */

78

destroy?: (cb: (err?: Error) => void) => void;

79

80

/** Shorthand for _predestroy method */

81

predestroy?: () => void;

82

83

/** Text encoding for automatic string decoding */

84

encoding?: string;

85

}

86

```

87

88

**Usage Examples:**

89

90

```javascript

91

const { Readable } = require('streamx');

92

93

// Basic readable stream

94

const readable = new Readable({

95

read(cb) {

96

// Push some data

97

this.push('Hello, ');

98

this.push('World!');

99

this.push(null); // End the stream

100

cb();

101

}

102

});

103

104

// Listen for data

105

readable.on('data', (chunk) => {

106

console.log('Received:', chunk.toString());

107

});

108

109

readable.on('end', () => {

110

console.log('Stream ended');

111

});

112

113

// Readable with lifecycle hooks

114

const fileReader = new Readable({

115

open(cb) {

116

console.log('Opening file...');

117

// Open file or resource

118

cb();

119

},

120

121

read(cb) {

122

// Read from file

123

this.push('File content...');

124

this.push(null);

125

cb();

126

},

127

128

destroy(cb) {

129

console.log('Closing file...');

130

// Clean up resources

131

cb();

132

}

133

});

134

```

135

136

### Static Methods

137

138

StreamX provides static utility methods for readable stream inspection and manipulation.

139

140

```javascript { .api }

141

/**

142

* Check if a readable stream is currently paused

143

* @param stream - The readable stream to check

144

* @returns True if the stream is paused

145

*/

146

static isPaused(stream: Readable): boolean;

147

148

/**

149

* Check if a readable stream is under backpressure

150

* @param stream - The readable stream to check

151

* @returns True if the stream is backpressured

152

*/

153

static isBackpressured(stream: Readable): boolean;

154

155

/**

156

* Create a readable stream from various data sources

157

* @param source - Array, buffer, string, or async iterator

158

* @returns New readable stream

159

*/

160

static from(source: any[] | Buffer | string | AsyncIterable<any>): Readable;

161

```

162

163

**Static Method Examples:**

164

165

```javascript

166

const { Readable } = require('streamx');

167

168

// Create from array

169

const arrayStream = Readable.from([1, 2, 3, 4, 5]);

170

171

// Create from string

172

const stringStream = Readable.from('Hello World');

173

174

// Create from async iterator

175

async function* generator() {

176

yield 'first';

177

yield 'second';

178

yield 'third';

179

}

180

const iteratorStream = Readable.from(generator());

181

182

// Check stream state

183

if (Readable.isPaused(arrayStream)) {

184

console.log('Stream is paused');

185

}

186

187

if (Readable.isBackpressured(arrayStream)) {

188

console.log('Stream is under backpressure');

189

}

190

```

191

192

### Events

193

194

Readable streams emit various events during their lifecycle.

195

196

```javascript { .api }

197

interface ReadableEvents {

198

/** Emitted when data is available to read */

199

'readable': () => void;

200

201

/** Emitted when data is being read from the stream */

202

'data': (chunk: any) => void;

203

204

/** Emitted when the stream has ended and no more data is available */

205

'end': () => void;

206

207

/** Emitted when the stream has been fully closed */

208

'close': () => void;

209

210

/** Emitted when an error occurs */

211

'error': (err: Error) => void;

212

213

/** Emitted when the stream is piped to a destination */

214

'piping': (dest: Writable) => void;

215

}

216

```

217

218

### Properties

219

220

```javascript { .api }

221

interface ReadableProperties {

222

/** Boolean indicating whether the stream has been destroyed */

223

destroyed: boolean;

224

}

225

```

226

227

### Advanced Configuration

228

229

StreamX readable streams support advanced configuration for specialized use cases.

230

231

**Map Function:**

232

Transform data as it's pushed to the stream:

233

234

```javascript

235

const readable = new Readable({

236

map: (data) => {

237

// Transform strings to uppercase

238

return typeof data === 'string' ? data.toUpperCase() : data;

239

},

240

241

read(cb) {

242

this.push('hello');

243

this.push('world');

244

this.push(null);

245

cb();

246

}

247

});

248

```

249

250

**ByteLength Function:**

251

Custom byte length calculation for backpressure:

252

253

```javascript

254

const readable = new Readable({

255

byteLength: (data) => {

256

// Custom size calculation

257

if (typeof data === 'string') return data.length;

258

if (Buffer.isBuffer(data)) return data.length;

259

return 1024; // Default size for objects

260

},

261

262

highWaterMark: 8192, // 8KB buffer

263

264

read(cb) {

265

this.push({ large: 'object' });

266

cb();

267

}

268

});

269

```

270

271

**AbortSignal Integration:**

272

273

```javascript

274

const controller = new AbortController();

275

276

const readable = new Readable({

277

signal: controller.signal,

278

279

read(cb) {

280

// This will be cancelled when signal is aborted

281

setTimeout(() => {

282

if (!controller.signal.aborted) {

283

this.push('data');

284

cb();

285

}

286

}, 1000);

287

}

288

});

289

290

// Cancel the stream after 500ms

291

setTimeout(() => controller.abort(), 500);

292

```