or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

browser.mdfiles.mdindex.mdschemas.mdservices.mdstreams.mdtypes.md
tile.json

streams.mddocs/

Streaming Support

High-performance streaming interfaces for processing large datasets with support for both container files and raw data streams.

Capabilities

Container Constants

Avro container format constants for low-level operations.

/** Avro container magic bytes (4 bytes: 'Obj' + 0x01) */
const MAGIC_BYTES: Buffer;

/** Type definition for container headers */
const HEADER_TYPE: Type;

/** Type definition for container blocks */
const BLOCK_TYPE: Type;

Block Decoder

Duplex stream for decoding Avro container files with block-based processing.

class BlockDecoder extends stream.Duplex {
  /**
   * Create block decoder stream
   * @param opts - Decoder options
   */
  constructor(opts);
  
  /**
   * Get default compression codecs
   * @returns Object containing default codecs
   */
  static defaultCodecs();
}

Usage Examples:

const avsc = require('avsc');
const fs = require('fs');

// Decode container file blocks
const decoder = new avsc.streams.BlockDecoder();

fs.createReadStream('./data.avro')
  .pipe(decoder)
  .on('data', (record) => {
    console.log('Decoded record:', record);
  });

// With custom codecs
const decoder = new avsc.streams.BlockDecoder({
  codecs: {
    'lz4': lz4Codec,
    'custom': customCodec
  }
});

Block Encoder

Duplex stream for encoding Avro container files with block-based compression.

class BlockEncoder extends stream.Duplex {
  /**
   * Create block encoder stream
   * @param schema - Avro schema for encoding
   * @param opts - Encoder options
   */
  constructor(schema, opts);
  
  /**
   * Get default compression codecs
   * @returns Object containing default codecs
   */
  static defaultCodecs();
}

Usage Examples:

const avsc = require('avsc');
const fs = require('fs');

const schema = {
  type: 'record',
  name: 'LogEntry',
  fields: [
    {name: 'timestamp', type: 'long'},
    {name: 'message', type: 'string'},
    {name: 'level', type: {type: 'enum', name: 'Level', symbols: ['DEBUG', 'INFO', 'WARN', 'ERROR']}}
  ]
};

// Create encoder with compression
const encoder = new avsc.streams.BlockEncoder(schema, {
  codec: 'deflate',
  blockSize: 32768
});

encoder.pipe(fs.createWriteStream('./logs.avro'));

// Write log entries
encoder.write({timestamp: Date.now(), message: 'Application started', level: 'INFO'});
encoder.write({timestamp: Date.now(), message: 'Processing request', level: 'DEBUG'});
encoder.end();

Raw Decoder

Duplex stream for decoding raw Avro data without container format.

class RawDecoder extends stream.Duplex {
  /**
   * Create raw decoder stream
   * @param schema - Avro schema for decoding
   * @param opts - Decoder options
   */
  constructor(schema, opts);
}

Usage Examples:

const avsc = require('avsc');

const schema = 'string';
const decoder = new avsc.streams.RawDecoder(schema);

decoder.on('data', (value) => {
  console.log('Decoded string:', value);
});

// Decode raw bytes
decoder.write(Buffer.from([6, 102, 111, 111])); // Encodes "foo"
decoder.end();

Raw Encoder

Duplex stream for encoding raw Avro data without container format.

class RawEncoder extends stream.Duplex {
  /**
   * Create raw encoder stream
   * @param schema - Avro schema for encoding
   * @param opts - Encoder options
   */
  constructor(schema, opts);
}

Usage Examples:

const avsc = require('avsc');

const schema = {
  type: 'record',
  name: 'Point',
  fields: [
    {name: 'x', type: 'double'},
    {name: 'y', type: 'double'}
  ]
};

const encoder = new avsc.streams.RawEncoder(schema);

encoder.on('data', (buffer) => {
  console.log('Encoded bytes:', buffer);
});

// Encode coordinates
encoder.write({x: 1.5, y: 2.7});
encoder.write({x: -0.5, y: 3.14});
encoder.end();

Stream Events

All streaming classes emit standard Node.js stream events plus additional Avro-specific events:

interface StreamEvents {
  /** Standard stream events */
  'data': (chunk: any) => void;
  'end': () => void;
  'error': (err: Error) => void;
  'close': () => void;
  
  /** Avro-specific events (BlockDecoder only) */
  'metadata': (type: Type, codec: string, header: any) => void;
}

Usage Examples:

const decoder = new avsc.streams.BlockDecoder();

// Handle metadata when available
decoder.on('metadata', (type, codec, header) => {
  console.log('Schema:', type.schema());
  console.log('Compression:', codec);
  console.log('File metadata:', header.meta);
});

// Handle records
decoder.on('data', (record) => {
  console.log('Record:', record);
});

// Handle errors
decoder.on('error', (err) => {
  console.error('Stream error:', err);
});

Types

interface StreamOptions {
  /** Transform hook for records */
  transform?: (record: any) => any;
  
  /** Custom compression codecs */
  codecs?: {[name: string]: Codec};
  
  /** Whether to emit raw buffers instead of decoded values */
  noDecode?: boolean;
}

interface BlockEncoderOptions extends StreamOptions {
  /** Block size in bytes */
  blockSize?: number;
  
  /** Compression codec name */
  codec?: string;
  
  /** Whether to write container header */
  writeHeader?: boolean;
  
  /** Custom synchronization marker */
  syncMarker?: Buffer;
}

interface BlockDecoderOptions extends StreamOptions {
  /** Reader schema for schema evolution */
  readerSchema?: Type | any;
  
  /** Hook for custom schema parsing */
  parseHook?: (schema: any) => Type;
}

type Codec = (buffer: Buffer, callback: (err: Error | null, result?: Buffer) => void) => void;

Frame Streams

Low-level streaming classes for RPC frame encoding and decoding.

class FrameDecoder extends stream.Duplex {
  /**
   * Create frame decoder stream
   * @param opts - Decoder options
   */
  constructor(opts);
}

class FrameEncoder extends stream.Duplex {
  /**
   * Create frame encoder stream
   * @param opts - Encoder options
   */
  constructor(opts);
}

class NettyDecoder extends stream.Duplex {
  /**
   * Create Netty-format frame decoder stream
   * @param opts - Decoder options
   */
  constructor(opts);
}

class NettyEncoder extends stream.Duplex {
  /**
   * Create Netty-format frame encoder stream
   * @param opts - Encoder options
   */
  constructor(opts);
}