or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

files.mdindex.mdprotocols.mdstreams.mdtypes.mdutils.md
tile.json

streams.mddocs/

Streaming

Node.js streaming interfaces for processing Avro data with support for raw value encoding/decoding and container file block processing. Enables efficient handling of large datasets with backpressure management.

Capabilities

Raw Data Streaming

Process sequences of Avro values without container file headers.

/**
 * Duplex stream for decoding raw Avro data
 */
class RawDecoder extends stream.Duplex {
 /**
  * Create raw decoder for concatenated Avro values
  * @param {Object|Type} schema - Schema or Type instance
  * @param {Object} opts - Options object
  */
 constructor(schema, opts);
}

/**
 * Transform stream for encoding raw Avro data
 */
class RawEncoder extends stream.Transform {
 /**
  * Create raw encoder for concatenated Avro values
  * @param {Object|Type} schema - Schema or Type instance
  * @param {Object} opts - Options object
  */
 constructor(schema, opts);
}

Usage Examples:

const avro = require('avro-js');
const { RawDecoder, RawEncoder } = avro.streams;

const schema = {
  type: 'record',
  name: 'Event',
  fields: [
    { name: 'timestamp', type: 'long' },
    { name: 'message', type: 'string' }
  ]
};

// Encode stream of objects
const encoder = new RawEncoder(schema);
encoder.write({ timestamp: Date.now(), message: 'Hello' });
encoder.write({ timestamp: Date.now(), message: 'World' });
encoder.end();

// Decode stream back to objects
const decoder = new RawDecoder(schema);
decoder.on('data', (event) => {
  console.log('Event:', event.message, 'at', new Date(event.timestamp));
});

// Connect encoder to decoder
encoder.pipe(decoder);

// Get raw buffers instead of objects
const rawDecoder = new RawDecoder(schema, { decode: false });
rawDecoder.on('data', (buffer) => {
  console.log('Raw buffer:', buffer);
});

Container File Streaming

Process Avro container files with block-level granularity and compression support.

/**
 * Duplex stream for decoding Avro container files
 */
class BlockDecoder extends stream.Duplex {
 /**
  * Create block decoder for container files
  * @param {Object} opts - Options object
  */
 constructor(opts);

 /**
  * Get default compression codecs
  * @returns {Object} Default codecs (null, deflate)
  */
 static getDefaultCodecs();
}

/**
 * Duplex stream for encoding Avro container files
 */
class BlockEncoder extends stream.Duplex {
 /**
  * Create block encoder for container files
  * @param {Object|Type} schema - Schema or Type instance
  * @param {Object} opts - Options object
  */
 constructor(schema, opts);

 /**
  * Get default compression codecs
  * @returns {Object} Default codecs (null, deflate)
  */
 static getDefaultCodecs();

 /**
  * Get downstream writable stream
  * @returns {WritableStream} Downstream stream
  */
 getDownstream();
}

Usage Examples:

const avro = require('avro-js');
const { BlockDecoder, BlockEncoder } = avro.streams;
const fs = require('fs');
const zlib = require('zlib');

// Decode container file with compression
const decoder = new BlockDecoder({
  decode: true,
  codecs: {
    'null': (buf, cb) => cb(null, buf),
    'deflate': zlib.inflateRaw,
    'snappy': require('snappy').uncompress
  }
});

// Listen for file metadata
decoder.on('metadata', (type, codec, header) => {
  console.log('Schema:', type.toString());
  console.log('Codec:', codec);
  console.log('File metadata:', header.meta);
});

// Process records
decoder.on('data', (record) => {
  console.log('Record:', record);
});

fs.createReadStream('./data.avro').pipe(decoder);

// Encode with compression
const schema = { type: 'string' };
const encoder = new BlockEncoder(schema, {
  codec: 'deflate',
  blockSize: 1024 * 16,  // 16KB blocks
  codecs: BlockEncoder.getDefaultCodecs()
});

encoder.pipe(fs.createWriteStream('./output.avro'));
encoder.write('Hello');
encoder.write('World');
encoder.end();

Stream Pipeline Examples

Chain multiple streams for complex data processing workflows.

Usage Examples:

const avro = require('avro-js');
const { Transform } = require('stream');

// Custom transform stream
class DataProcessor extends Transform {
  constructor(options) {
    super({ objectMode: true, ...options });
  }
  
  _transform(record, encoding, callback) {
    // Process the record
    const processed = {
      ...record,
      processed: true,
      timestamp: Date.now()
    };
    callback(null, processed);
  }
}

// Pipeline: File -> Decode -> Process -> Encode -> File
const inputSchema = { type: 'record', name: 'Input', fields: [/* ... */] };
const outputSchema = { type: 'record', name: 'Output', fields: [/* ... */] };

fs.createReadStream('./input.avro')
  .pipe(new avro.streams.BlockDecoder())
  .pipe(new DataProcessor())
  .pipe(new avro.streams.BlockEncoder(outputSchema))
  .pipe(fs.createWriteStream('./output.avro'));

// Error handling in pipelines
const pipeline = require('stream').pipeline;

pipeline(
  fs.createReadStream('./input.avro'),
  new avro.streams.BlockDecoder(),
  new DataProcessor(),
  new avro.streams.BlockEncoder(outputSchema),
  fs.createWriteStream('./output.avro'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Stream Events

Monitor stream processing with built-in events.

// BlockDecoder events
interface BlockDecoderEvents {
  /** Emitted when file metadata is parsed */
  'metadata': (type: Type, codec: string, header: Object) => void;
  /** Emitted for each decoded record */
  'data': (record: any) => void;
  /** Emitted when stream ends */
  'end': () => void;
  /** Emitted on errors */
  'error': (err: Error) => void;
}

// BlockEncoder events  
interface BlockEncoderEvents {
  /** Emitted when a block is written */
  'block': (count: number, size: number) => void;
  /** Emitted when stream is finished */
  'finish': () => void;
  /** Emitted on errors */
  'error': (err: Error) => void;
}

Usage Examples:

const decoder = new avro.streams.BlockDecoder();

decoder.on('metadata', (type, codec, header) => {
  console.log(`Processing ${codec} compressed file`);
  console.log(`Schema: ${type.getName()}`);
});

decoder.on('data', (record) => {
  // Process each record
});

decoder.on('error', (err) => {
  console.error('Decoder error:', err);
});

const encoder = new avro.streams.BlockEncoder(schema);

encoder.on('block', (count, size) => {
  console.log(`Wrote block: ${count} records, ${size} bytes`);
});

encoder.on('finish', () => {
  console.log('Encoding complete');
});

Memory Management

Optimize memory usage for large datasets.

Usage Examples:

const avro = require('avro-js');

// Configure buffer sizes for memory efficiency
const decoder = new avro.streams.BlockDecoder({
  decode: true,
  // Don't buffer entire file in memory
  highWaterMark: 1024 * 16  // 16KB chunks
});

const encoder = new avro.streams.BlockEncoder(schema, {
  blockSize: 1024 * 64,     // 64KB blocks
  highWaterMark: 1024 * 16  // 16KB stream buffer
});

// Process large files without loading everything into memory
fs.createReadStream('./large-file.avro', { highWaterMark: 1024 * 16 })
  .pipe(decoder)
  .pipe(new Transform({
    objectMode: true,
    transform(record, encoding, callback) {
      // Process one record at a time
      callback(null, processRecord(record));
    }
  }))
  .pipe(encoder)
  .pipe(fs.createWriteStream('./processed-file.avro'));

Stream Access

All stream classes are available through the streams export:

const streams = {
  RawDecoder,
  RawEncoder,  
  BlockDecoder,
  BlockEncoder
};

Stream Options

interface RawStreamOptions {
  /** Whether to decode values to JavaScript objects (default: true) */
  decode?: boolean;
  /** Whether to batch writes for performance */
  batchWrites?: boolean;
  /** Stream high water mark */
  highWaterMark?: number;
}

interface BlockDecoderOptions {
  /** Whether to decode values to JavaScript objects (default: true) */
  decode?: boolean;
  /** Custom compression codecs */
  codecs?: {
    [codecName: string]: (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
  };
  /** Schema parsing options */
  parseOpts?: ParseOptions;
  /** Stream high water mark */
  highWaterMark?: number;
}

interface BlockEncoderOptions {
  /** Block size in bytes (default: 65536) */
  blockSize?: number;
  /** Compression codec name (default: 'null') */
  codec?: string;
  /** Custom compression codecs */
  codecs?: {
    [codecName: string]: (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
  };
  /** Omit file header for raw block output */
  omitHeader?: boolean;
  /** Custom 16-byte sync marker */
  syncMarker?: Buffer;
  /** Metadata to include in file header */
  metadata?: { [key: string]: Buffer };
  /** Stream high water mark */
  highWaterMark?: number;
}

interface DefaultCodecs {
  /** No compression */
  'null': (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
  /** Deflate compression */
  'deflate': (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
}