CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-avro-js

Pure JavaScript implementation of Apache Avro specification for fast binary serialization with schema evolution support

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

streams.mddocs/

Streaming

Node.js streaming interfaces for processing Avro data with support for raw value encoding/decoding and container file block processing. Enables efficient handling of large datasets with backpressure management.

Capabilities

Raw Data Streaming

Process sequences of Avro values without container file headers.

/**
 * Duplex stream for decoding raw Avro data
 */
class RawDecoder extends stream.Duplex {
 /**
  * Create raw decoder for concatenated Avro values
  * @param {Object|Type} schema - Schema or Type instance
  * @param {Object} opts - Options object
  */
 constructor(schema, opts);
}

/**
 * Transform stream for encoding raw Avro data
 */
class RawEncoder extends stream.Transform {
 /**
  * Create raw encoder for concatenated Avro values
  * @param {Object|Type} schema - Schema or Type instance
  * @param {Object} opts - Options object
  */
 constructor(schema, opts);
}

Usage Examples:

const avro = require('avro-js');
const { RawDecoder, RawEncoder } = avro.streams;

const schema = {
  type: 'record',
  name: 'Event',
  fields: [
    { name: 'timestamp', type: 'long' },
    { name: 'message', type: 'string' }
  ]
};

// Encode stream of objects
const encoder = new RawEncoder(schema);
encoder.write({ timestamp: Date.now(), message: 'Hello' });
encoder.write({ timestamp: Date.now(), message: 'World' });
encoder.end();

// Decode stream back to objects
const decoder = new RawDecoder(schema);
decoder.on('data', (event) => {
  console.log('Event:', event.message, 'at', new Date(event.timestamp));
});

// Connect encoder to decoder
encoder.pipe(decoder);

// Get raw buffers instead of objects
const rawDecoder = new RawDecoder(schema, { decode: false });
rawDecoder.on('data', (buffer) => {
  console.log('Raw buffer:', buffer);
});

Container File Streaming

Process Avro container files with block-level granularity and compression support.

/**
 * Duplex stream for decoding Avro container files
 */
class BlockDecoder extends stream.Duplex {
 /**
  * Create block decoder for container files
  * @param {Object} opts - Options object
  */
 constructor(opts);

 /**
  * Get default compression codecs
  * @returns {Object} Default codecs (null, deflate)
  */
 static getDefaultCodecs();
}

/**
 * Duplex stream for encoding Avro container files
 */
class BlockEncoder extends stream.Duplex {
 /**
  * Create block encoder for container files
  * @param {Object|Type} schema - Schema or Type instance
  * @param {Object} opts - Options object
  */
 constructor(schema, opts);

 /**
  * Get default compression codecs
  * @returns {Object} Default codecs (null, deflate)
  */
 static getDefaultCodecs();

 /**
  * Get downstream writable stream
  * @returns {WritableStream} Downstream stream
  */
 getDownstream();
}

Usage Examples:

const avro = require('avro-js');
const { BlockDecoder, BlockEncoder } = avro.streams;
const fs = require('fs');
const zlib = require('zlib');

// Decode container file with compression
const decoder = new BlockDecoder({
  decode: true,
  codecs: {
    'null': (buf, cb) => cb(null, buf),
    'deflate': zlib.inflateRaw,
    'snappy': require('snappy').uncompress
  }
});

// Listen for file metadata
decoder.on('metadata', (type, codec, header) => {
  console.log('Schema:', type.toString());
  console.log('Codec:', codec);
  console.log('File metadata:', header.meta);
});

// Process records
decoder.on('data', (record) => {
  console.log('Record:', record);
});

fs.createReadStream('./data.avro').pipe(decoder);

// Encode with compression
const schema = { type: 'string' };
const encoder = new BlockEncoder(schema, {
  codec: 'deflate',
  blockSize: 1024 * 16,  // 16KB blocks
  codecs: BlockEncoder.getDefaultCodecs()
});

encoder.pipe(fs.createWriteStream('./output.avro'));
encoder.write('Hello');
encoder.write('World');
encoder.end();

Stream Pipeline Examples

Chain multiple streams for complex data processing workflows.

Usage Examples:

const avro = require('avro-js');
const { Transform } = require('stream');

// Custom transform stream
class DataProcessor extends Transform {
  constructor(options) {
    super({ objectMode: true, ...options });
  }
  
  _transform(record, encoding, callback) {
    // Process the record
    const processed = {
      ...record,
      processed: true,
      timestamp: Date.now()
    };
    callback(null, processed);
  }
}

// Pipeline: File -> Decode -> Process -> Encode -> File
const inputSchema = { type: 'record', name: 'Input', fields: [/* ... */] };
const outputSchema = { type: 'record', name: 'Output', fields: [/* ... */] };

fs.createReadStream('./input.avro')
  .pipe(new avro.streams.BlockDecoder())
  .pipe(new DataProcessor())
  .pipe(new avro.streams.BlockEncoder(outputSchema))
  .pipe(fs.createWriteStream('./output.avro'));

// Error handling in pipelines
const pipeline = require('stream').pipeline;

pipeline(
  fs.createReadStream('./input.avro'),
  new avro.streams.BlockDecoder(),
  new DataProcessor(),
  new avro.streams.BlockEncoder(outputSchema),
  fs.createWriteStream('./output.avro'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Stream Events

Monitor stream processing with built-in events.

// BlockDecoder events
interface BlockDecoderEvents {
  /** Emitted when file metadata is parsed */
  'metadata': (type: Type, codec: string, header: Object) => void;
  /** Emitted for each decoded record */
  'data': (record: any) => void;
  /** Emitted when stream ends */
  'end': () => void;
  /** Emitted on errors */
  'error': (err: Error) => void;
}

// BlockEncoder events  
interface BlockEncoderEvents {
  /** Emitted when a block is written */
  'block': (count: number, size: number) => void;
  /** Emitted when stream is finished */
  'finish': () => void;
  /** Emitted on errors */
  'error': (err: Error) => void;
}

Usage Examples:

const decoder = new avro.streams.BlockDecoder();

decoder.on('metadata', (type, codec, header) => {
  console.log(`Processing ${codec} compressed file`);
  console.log(`Schema: ${type.getName()}`);
});

decoder.on('data', (record) => {
  // Process each record
});

decoder.on('error', (err) => {
  console.error('Decoder error:', err);
});

const encoder = new avro.streams.BlockEncoder(schema);

encoder.on('block', (count, size) => {
  console.log(`Wrote block: ${count} records, ${size} bytes`);
});

encoder.on('finish', () => {
  console.log('Encoding complete');
});

Memory Management

Optimize memory usage for large datasets.

Usage Examples:

const avro = require('avro-js');

// Configure buffer sizes for memory efficiency
const decoder = new avro.streams.BlockDecoder({
  decode: true,
  // Don't buffer entire file in memory
  highWaterMark: 1024 * 16  // 16KB chunks
});

const encoder = new avro.streams.BlockEncoder(schema, {
  blockSize: 1024 * 64,     // 64KB blocks
  highWaterMark: 1024 * 16  // 16KB stream buffer
});

// Process large files without loading everything into memory
fs.createReadStream('./large-file.avro', { highWaterMark: 1024 * 16 })
  .pipe(decoder)
  .pipe(new Transform({
    objectMode: true,
    transform(record, encoding, callback) {
      // Process one record at a time
      callback(null, processRecord(record));
    }
  }))
  .pipe(encoder)
  .pipe(fs.createWriteStream('./processed-file.avro'));

Stream Access

All stream classes are available through the streams export:

const streams = {
  RawDecoder,
  RawEncoder,  
  BlockDecoder,
  BlockEncoder
};

Stream Options

interface RawStreamOptions {
  /** Whether to decode values to JavaScript objects (default: true) */
  decode?: boolean;
  /** Whether to batch writes for performance */
  batchWrites?: boolean;
  /** Stream high water mark */
  highWaterMark?: number;
}

interface BlockDecoderOptions {
  /** Whether to decode values to JavaScript objects (default: true) */
  decode?: boolean;
  /** Custom compression codecs */
  codecs?: {
    [codecName: string]: (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
  };
  /** Schema parsing options */
  parseOpts?: ParseOptions;
  /** Stream high water mark */
  highWaterMark?: number;
}

interface BlockEncoderOptions {
  /** Block size in bytes (default: 65536) */
  blockSize?: number;
  /** Compression codec name (default: 'null') */
  codec?: string;
  /** Custom compression codecs */
  codecs?: {
    [codecName: string]: (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
  };
  /** Omit file header for raw block output */
  omitHeader?: boolean;
  /** Custom 16-byte sync marker */
  syncMarker?: Buffer;
  /** Metadata to include in file header */
  metadata?: { [key: string]: Buffer };
  /** Stream high water mark */
  highWaterMark?: number;
}

interface DefaultCodecs {
  /** No compression */
  'null': (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
  /** Deflate compression */
  'deflate': (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
}

docs

files.md

index.md

protocols.md

streams.md

types.md

utils.md

tile.json