or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

duplex.mdindex.mdreadable.mdstream.mdtransform.mdwritable.md

transform.mddocs/

0

# Transform Streams

1

2

Transform streams for data transformation, extending Duplex with specialized transformation capabilities that map written data to readable output.

3

4

## Capabilities

5

6

### Transform Constructor

7

8

Creates a new Transform stream with transformation configuration.

9

10

```javascript { .api }

11

/**

12

* Creates a new Transform stream

13

* @param opts - Configuration options including transformation function

14

*/

15

class Transform extends Duplex {

16

constructor(opts?: TransformOptions);

17

}

18

19

interface TransformOptions extends DuplexOptions {

20

/** Transform function shorthand */

21

transform?: (data: any, cb: (error?: Error, result?: any) => void) => void;

22

/** Flush function shorthand */

23

flush?: (cb: (error?: Error, result?: any) => void) => void;

24

}

25

```

26

27

**Usage Example:**

28

29

```javascript

30

const { Transform } = require("@mafintosh/streamx");

31

32

const upperCaseTransform = new Transform({

33

transform(data, cb) {

34

const result = data.toString().toUpperCase();

35

cb(null, result);

36

}

37

});

38

```

39

40

### Transformation Logic

41

42

#### _transform Method

43

44

Called to transform incoming data. Override to implement transformation logic.

45

46

```javascript { .api }

47

/**

48

* Transform incoming data

49

* @param data - Data to transform

50

* @param callback - Callback with error and transformed result, or use push()

51

*/

52

_transform(data: any, callback: (error?: Error, result?: any) => void): void;

53

```

54

55

**Usage Example:**

56

57

```javascript

58

class JSONTransform extends Transform {

59

_transform(data, cb) {

60

try {

61

const parsed = JSON.parse(data.toString());

62

const transformed = {

63

...parsed,

64

processed: true,

65

timestamp: Date.now()

66

};

67

cb(null, JSON.stringify(transformed));

68

} catch (err) {

69

cb(err);

70

}

71

}

72

}

73

```

74

75

#### _flush Method

76

77

Called at the end of the transform to flush any remaining data.

78

79

```javascript { .api }

80

/**

81

* Called to flush any remaining data at the end of transformation

82

* @param callback - Callback with error and optional final data

83

*/

84

_flush(callback: (error?: Error, result?: any) => void): void;

85

```

86

87

#### _final Method

88

89

Called internally by the stream when ending. Automatically calls `_flush`.

90

91

```javascript { .api }

92

/**

93

* Called internally when the transform stream is ending

94

* @param callback - Callback to call when final processing is complete

95

*/

96

_final(callback: (error?: Error) => void): void;

97

```

98

99

### Transform Patterns

100

101

#### Using Callback

102

103

Transform data and pass result via callback:

104

105

```javascript

106

const csvToJson = new Transform({

107

transform(data, cb) {

108

const lines = data.toString().split('\n');

109

const result = lines.map(line => {

110

const [name, age] = line.split(',');

111

return JSON.stringify({ name, age: parseInt(age) });

112

}).join('\n');

113

cb(null, result);

114

}

115

});

116

```

117

118

#### Using Push

119

120

Transform data and push result directly:

121

122

```javascript

123

const splitTransform = new Transform({

124

transform(data, cb) {

125

const lines = data.toString().split('\n');

126

lines.forEach(line => {

127

if (line.trim()) {

128

this.push(line.trim());

129

}

130

});

131

cb(null); // No result via callback

132

}

133

});

134

```

135

136

#### Multiple Output

137

138

Generate multiple outputs for single input:

139

140

```javascript

141

const duplicateTransform = new Transform({

142

transform(data, cb) {

143

const str = data.toString();

144

this.push(`Original: ${str}`);

145

this.push(`Copy: ${str}`);

146

this.push(`Uppercase: ${str.toUpperCase()}`);

147

cb(null);

148

}

149

});

150

```

151

152

### Common Transform Examples

153

154

#### Text Processing

155

156

```javascript

157

const { Transform } = require("@mafintosh/streamx");

158

159

// Line-by-line processor

160

class LineProcessor extends Transform {

161

constructor() {

162

super();

163

this.buffer = '';

164

}

165

166

_transform(chunk, cb) {

167

this.buffer += chunk.toString();

168

const lines = this.buffer.split('\n');

169

170

// Keep the last incomplete line in buffer

171

this.buffer = lines.pop();

172

173

// Process complete lines

174

lines.forEach(line => {

175

if (line.trim()) {

176

const processed = `[${new Date().toISOString()}] ${line}`;

177

this.push(processed);

178

}

179

});

180

181

cb(null);

182

}

183

184

_flush(cb) {

185

// Process any remaining data in buffer

186

if (this.buffer.trim()) {

187

const processed = `[${new Date().toISOString()}] ${this.buffer}`;

188

this.push(processed);

189

}

190

cb(null);

191

}

192

}

193

```

194

195

#### Data Validation and Filtering

196

197

```javascript

198

class ValidatorTransform extends Transform {

199

constructor(validator) {

200

super();

201

this.validate = validator;

202

this.validCount = 0;

203

this.invalidCount = 0;

204

}

205

206

_transform(data, cb) {

207

try {

208

const item = JSON.parse(data.toString());

209

if (this.validate(item)) {

210

this.validCount++;

211

this.push(JSON.stringify(item));

212

} else {

213

this.invalidCount++;

214

console.warn('Invalid item:', item);

215

}

216

} catch (err) {

217

this.invalidCount++;

218

console.error('Parse error:', err.message);

219

}

220

cb(null);

221

}

222

223

_flush(cb) {

224

console.log(`Processed: ${this.validCount} valid, ${this.invalidCount} invalid`);

225

cb(null);

226

}

227

}

228

229

// Usage

230

const validator = new ValidatorTransform(item =>

231

typeof item.name === 'string' && typeof item.age === 'number'

232

);

233

```

234

235

#### Compression/Encoding Transform

236

237

```javascript

238

const crypto = require('crypto');

239

240

class Base64Transform extends Transform {

241

_transform(data, cb) {

242

const encoded = Buffer.from(data).toString('base64');

243

cb(null, encoded);

244

}

245

}

246

247

class HashTransform extends Transform {

248

constructor(algorithm = 'sha256') {

249

super();

250

this.algorithm = algorithm;

251

}

252

253

_transform(data, cb) {

254

const hash = crypto.createHash(this.algorithm);

255

hash.update(data);

256

const result = hash.digest('hex');

257

cb(null, result);

258

}

259

}

260

```

261

262

### Pass-through Behavior

263

264

By default, Transform acts as a pass-through stream if no `_transform` is implemented:

265

266

```javascript

267

const passThrough = new Transform(); // Acts as pass-through

268

269

// Data written to it is emitted as readable data unchanged

270

passThrough.write("Hello");

271

passThrough.end();

272

273

passThrough.on('data', (chunk) => {

274

console.log('Passed through:', chunk.toString()); // "Hello"

275

});

276

```

277

278

### Pipeline Usage

279

280

Transform streams work excellently in pipelines:

281

282

```javascript

283

const { Readable, Writable, Transform } = require("@mafintosh/streamx");

284

285

// Create source data

286

const source = new Readable({

287

read(cb) {

288

this.push('hello world\n');

289

this.push('foo bar\n');

290

this.push(null);

291

cb(null);

292

}

293

});

294

295

// Create transforms

296

const upperCase = new Transform({

297

transform(data, cb) {

298

cb(null, data.toString().toUpperCase());

299

}

300

});

301

302

const addPrefix = new Transform({

303

transform(data, cb) {

304

cb(null, `>> ${data}`);

305

}

306

});

307

308

// Create destination

309

const destination = new Writable({

310

write(data, cb) {

311

console.log('Final:', data.toString());

312

cb(null);

313

}

314

});

315

316

// Pipeline

317

source.pipe(upperCase).pipe(addPrefix).pipe(destination, (err) => {

318

if (err) console.error('Pipeline error:', err);

319

else console.log('Pipeline completed');

320

});

321

```

322

323

### Advanced Transform Features

324

325

#### Stateful Transformation

326

327

```javascript

328

class CounterTransform extends Transform {

329

constructor() {

330

super();

331

this.lineNumber = 0;

332

}

333

334

_transform(data, cb) {

335

const lines = data.toString().split('\n');

336

const result = lines.map(line => {

337

if (line.trim()) {

338

return `${++this.lineNumber}: ${line}`;

339

}

340

return line;

341

}).join('\n');

342

cb(null, result);

343

}

344

}

345

```

346

347

#### Async Transformation

348

349

```javascript

350

class AsyncTransform extends Transform {

351

async _transform(data, cb) {

352

try {

353

// Simulate async operation

354

const result = await this.processAsync(data.toString());

355

cb(null, result);

356

} catch (err) {

357

cb(err);

358

}

359

}

360

361

async processAsync(data) {

362

return new Promise(resolve => {

363

setTimeout(() => {

364

resolve(data.toUpperCase());

365

}, 100);

366

});

367

}

368

}

369

```

370

371

### Events

372

373

Transform streams inherit all events from Duplex streams:

374

375

```javascript

376

const transform = new Transform({

377

transform(data, cb) {

378

cb(null, data.toString().toUpperCase());

379

}

380

});

381

382

transform.on('data', (chunk) => {

383

console.log('Transformed:', chunk.toString());

384

});

385

386

transform.on('finish', () => {

387

console.log('All transforms completed');

388

});

389

390

transform.on('close', () => {

391

console.log('Transform stream closed');

392

});

393

394

transform.write('hello');

395

transform.write('world');

396

transform.end();

397

```