or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

callback-api.mdindex.mdstream-api.mdsync-api.md

stream-api.mddocs/

0

# Stream API

1

2

Core streaming transformation functionality for scalable data processing. The stream API extends Node.js `stream.Transform` to provide object transformations with full backpressure support and event-driven processing.

3

4

## Capabilities

5

6

### Transform Function

7

8

Creates a transform stream for processing records with configurable options and handler functions.

9

10

```javascript { .api }

11

/**

12

* Create a transform stream with handler function

13

* @param handler - Function to transform each record

14

* @param callback - Optional completion callback for auto-consumption

15

* @returns Transformer stream instance

16

*/

17

function transform<T, U>(handler: Handler<T, U>, callback?: Callback): Transformer;

18

19

/**

20

* Create a transform stream with options and handler

21

* @param options - Configuration options for the transformer

22

* @param handler - Function to transform each record

23

* @param callback - Optional completion callback for auto-consumption

24

* @returns Transformer stream instance

25

*/

26

function transform<T, U>(options: Options, handler: Handler<T, U>, callback?: Callback): Transformer;

27

```

28

29

**Usage Examples:**

30

31

```javascript

32

import { transform } from "stream-transform";

33

import { createReadStream } from "fs";

34

35

// Basic stream transformation

36

const transformer = transform((record) => {

37

return record.map(field => field.toUpperCase());

38

});

39

40

// Stream with options

41

const transformer = transform({

42

parallel: 50,

43

params: { prefix: "processed_" }

44

}, (record, params) => {

45

return params.prefix + record.join(",");

46

});

47

48

// Auto-consumption with callback

49

const transformer = transform((record) => record, (err, results) => {

50

if (err) throw err;

51

console.log(`Processed ${results.length} records`);

52

});

53

54

// Pipe data through transformer

55

createReadStream("input.csv")

56

.pipe(csvParser())

57

.pipe(transformer)

58

.pipe(createWriteStream("output.csv"));

59

```

60

61

### Transformer Class

62

63

Transform stream class that extends Node.js `stream.Transform` with additional state tracking and configuration.

64

65

```javascript { .api }

66

/**

67

* Transform stream class for data processing

68

*/

69

class Transformer extends stream.Transform {

70

/**

71

* Create a new Transformer instance

72

* @param options - Configuration options

73

*/

74

constructor(options: Options);

75

76

/** Configuration options (read-only) */

77

readonly options: Options;

78

79

/** Current transformation state (read-only) */

80

readonly state: State;

81

}

82

```

83

84

**Properties:**

85

86

- `options`: Immutable configuration object containing transformer settings

87

- `state`: Real-time statistics about transformation progress

88

89

**Usage Examples:**

90

91

```javascript

92

import { transform } from "stream-transform";

93

94

const transformer = transform({

95

parallel: 10,

96

consume: true,

97

params: { multiplier: 2 }

98

}, (record, params) => record * params.multiplier);

99

100

// Access configuration

101

console.log(transformer.options.parallel); // 10

102

console.log(transformer.options.params.multiplier); // 2

103

104

// Monitor progress

105

transformer.on('readable', () => {

106

console.log(`Progress: ${transformer.state.finished}/${transformer.state.started}`);

107

});

108

```

109

110

### Stream Events

111

112

Transformer streams emit standard Node.js stream events plus custom events for monitoring.

113

114

```javascript { .api }

115

// Standard stream events

116

transformer.on('readable', () => { /* Data available for reading */ });

117

transformer.on('end', () => { /* No more data will be written */ });

118

transformer.on('finish', () => { /* All data has been processed */ });

119

transformer.on('error', (err) => { /* Error occurred during processing */ });

120

transformer.on('close', () => { /* Stream has been destroyed */ });

121

122

// Pipe to destination

123

transformer.on('pipe', (src) => { /* Stream was piped from src */ });

124

transformer.on('unpipe', (src) => { /* Stream was unpiped from src */ });

125

```

126

127

**Usage Examples:**

128

129

```javascript

130

import { transform } from "stream-transform";

131

132

const transformer = transform((record) => processRecord(record));

133

134

// Handle events

135

transformer.on('readable', function() {

136

let record;

137

while ((record = this.read()) !== null) {

138

console.log('Processed:', record);

139

}

140

});

141

142

transformer.on('error', (err) => {

143

console.error('Transformation error:', err);

144

});

145

146

transformer.on('finish', () => {

147

console.log('All records processed');

148

console.log(`Final stats: ${transformer.state.finished} completed`);

149

});

150

```

151

152

### Handler Functions

153

154

User-defined transformation functions that process individual records. Supports multiple execution patterns.

155

156

```javascript { .api }

157

/**

158

* Synchronous handler - returns result directly

159

*/

160

type SyncHandler<T, U> = (record: T, params?: any) => U;

161

162

/**

163

* Asynchronous handler - uses callback for result

164

*/

165

type AsyncHandler<T, U> = (record: T, callback: HandlerCallback<U>, params?: any) => void;

166

167

/**

168

* Promise-based handler - returns Promise of result

169

*/

170

type PromiseHandler<T, U> = (record: T, params?: any) => Promise<U>;

171

172

/**

173

* Generic handler type (union of all patterns)

174

*/

175

type Handler<T, U> = SyncHandler<T, U> | AsyncHandler<T, U> | PromiseHandler<T, U>;

176

177

/**

178

* Callback function for asynchronous handlers

179

*/

180

type HandlerCallback<T = any> = (err?: null | Error, record?: T) => void;

181

```

182

183

**Handler Detection:**

184

185

The library automatically detects handler type based on function signature:

186

187

- **1 parameter** (+ optional params): Synchronous handler

188

- **2 parameters** (+ optional params): Asynchronous handler (second param is callback)

189

- **Return value with `.then` method**: Promise-based handler

190

191

**Usage Examples:**

192

193

```javascript

194

import { transform } from "stream-transform";

195

196

// Synchronous handler

197

const syncTransformer = transform((record) => {

198

return record.map(field => field.trim());

199

});

200

201

// Asynchronous handler

202

const asyncTransformer = transform((record, callback) => {

203

setTimeout(() => {

204

callback(null, record.join("|"));

205

}, 10);

206

});

207

208

// Promise-based handler

209

const promiseTransformer = transform(async (record) => {

210

const result = await processAsync(record);

211

return result;

212

});

213

214

// Handler with params

215

const paramsTransformer = transform({

216

params: { separator: "|", prefix: "row_" }

217

}, (record, params) => {

218

return params.prefix + record.join(params.separator);

219

});

220

```

221

222

### Configuration Options

223

224

Configuration object for customizing transformer behavior.

225

226

```javascript { .api }

227

interface Options extends stream.TransformOptions {

228

/**

229

* Auto-consume stream when no consumer is present

230

* @default false

231

*/

232

consume?: boolean;

233

234

/**

235

* Number of parallel transformation callbacks (async handlers only)

236

* @default 100

237

*/

238

parallel?: number;

239

240

/**

241

* User-defined parameters passed to handler function

242

* @default null

243

*/

244

params?: any;

245

}

246

```

247

248

**Usage Examples:**

249

250

```javascript

251

import { transform } from "stream-transform";

252

253

// Auto-consumption for standalone processing

254

const autoConsumer = transform({

255

consume: true

256

}, (record) => processRecord(record));

257

258

// High concurrency for I/O-bound operations

259

const highConcurrency = transform({

260

parallel: 500

261

}, async (record) => {

262

return await fetchDataForRecord(record);

263

});

264

265

// Custom parameters

266

const withParams = transform({

267

params: {

268

apiKey: process.env.API_KEY,

269

timeout: 5000

270

}

271

}, (record, params) => {

272

return enrichRecord(record, params.apiKey, params.timeout);

273

});

274

275

// Inherit stream options

276

const customStream = transform({

277

highWaterMark: 64 * 1024,

278

objectMode: true // automatically set by transform

279

}, (record) => record);

280

```

281

282

### State Monitoring

283

284

Real-time statistics about transformation progress and performance.

285

286

```javascript { .api }

287

interface State {

288

/** Number of transformations that have completed successfully */

289

finished: number;

290

291

/** Number of transformations currently being processed */

292

running: number;

293

294

/** Total number of transformations that have been started */

295

started: number;

296

297

/** Whether the stream is currently paused due to backpressure */

298

paused: boolean;

299

}

300

```

301

302

**Usage Examples:**

303

304

```javascript

305

import { transform } from "stream-transform";

306

307

const transformer = transform((record) => processRecord(record));

308

309

// Monitor progress

310

const progressInterval = setInterval(() => {

311

const { started, running, finished } = transformer.state;

312

const completion = started ? (finished / started * 100).toFixed(1) : 0;

313

console.log(`Progress: ${completion}% (${finished}/${started}, ${running} running)`);

314

}, 1000);

315

316

transformer.on('finish', () => {

317

clearInterval(progressInterval);

318

console.log(`Final: ${transformer.state.finished} records processed`);

319

});

320

321

// Detect processing bottlenecks

322

transformer.on('readable', () => {

323

if (transformer.state.running > transformer.options.parallel * 0.8) {

324

console.warn('High concurrency usage detected');

325

}

326

});

327

```

328

329

### Error Handling

330

331

Comprehensive error handling for stream processing with proper cleanup and recovery.

332

333

```javascript { .api }

334

// Errors are emitted as 'error' events

335

transformer.on('error', (err: Error) => void);

336

337

// Handler errors automatically destroy the stream

338

// Callback-based handlers: call callback(error)

339

// Sync handlers: throw error

340

// Promise handlers: reject promise

341

```

342

343

**Usage Examples:**

344

345

```javascript

346

import { transform } from "stream-transform";

347

348

const transformer = transform((record) => {

349

if (!record || record.length === 0) {

350

throw new Error('Invalid record: empty or null');

351

}

352

return processRecord(record);

353

});

354

355

// Handle stream errors

356

transformer.on('error', (err) => {

357

console.error('Transform error:', err.message);

358

// Stream is automatically destroyed

359

cleanup();

360

});

361

362

// Async handler error handling

363

const asyncTransformer = transform((record, callback) => {

364

processRecordAsync(record, (err, result) => {

365

if (err) {

366

return callback(err); // Will emit 'error' event

367

}

368

callback(null, result);

369

});

370

});

371

372

// Promise handler error handling

373

const promiseTransformer = transform(async (record) => {

374

try {

375

return await riskyOperation(record);

376

} catch (err) {

377

throw new Error(`Failed to process record: ${err.message}`);

378

}

379

});

380

```