Node.js streaming interfaces for processing Avro data with support for raw value encoding/decoding and container file block processing. Enables efficient handling of large datasets with backpressure management.
Process sequences of Avro values without container file headers.
/**
* Duplex stream for decoding raw Avro data
*/
class RawDecoder extends stream.Duplex {
/**
* Create raw decoder for concatenated Avro values
* @param {Object|Type} schema - Schema or Type instance
* @param {Object} opts - Options object
*/
constructor(schema, opts);
}
/**
* Transform stream for encoding raw Avro data
*/
class RawEncoder extends stream.Transform {
/**
* Create raw encoder for concatenated Avro values
* @param {Object|Type} schema - Schema or Type instance
* @param {Object} opts - Options object
*/
constructor(schema, opts);
}Usage Examples:
const avro = require('avro-js');
const { RawDecoder, RawEncoder } = avro.streams;
const schema = {
type: 'record',
name: 'Event',
fields: [
{ name: 'timestamp', type: 'long' },
{ name: 'message', type: 'string' }
]
};
// Encode stream of objects
const encoder = new RawEncoder(schema);
encoder.write({ timestamp: Date.now(), message: 'Hello' });
encoder.write({ timestamp: Date.now(), message: 'World' });
encoder.end();
// Decode stream back to objects
const decoder = new RawDecoder(schema);
decoder.on('data', (event) => {
console.log('Event:', event.message, 'at', new Date(event.timestamp));
});
// Connect encoder to decoder
encoder.pipe(decoder);
// Get raw buffers instead of objects
const rawDecoder = new RawDecoder(schema, { decode: false });
rawDecoder.on('data', (buffer) => {
console.log('Raw buffer:', buffer);
});Process Avro container files with block-level granularity and compression support.
/**
* Duplex stream for decoding Avro container files
*/
class BlockDecoder extends stream.Duplex {
/**
* Create block decoder for container files
* @param {Object} opts - Options object
*/
constructor(opts);
/**
* Get default compression codecs
* @returns {Object} Default codecs (null, deflate)
*/
static getDefaultCodecs();
}
/**
* Duplex stream for encoding Avro container files
*/
class BlockEncoder extends stream.Duplex {
/**
* Create block encoder for container files
* @param {Object|Type} schema - Schema or Type instance
* @param {Object} opts - Options object
*/
constructor(schema, opts);
/**
* Get default compression codecs
* @returns {Object} Default codecs (null, deflate)
*/
static getDefaultCodecs();
/**
* Get downstream writable stream
* @returns {WritableStream} Downstream stream
*/
getDownstream();
}Usage Examples:
const avro = require('avro-js');
const { BlockDecoder, BlockEncoder } = avro.streams;
const fs = require('fs');
const zlib = require('zlib');
// Decode container file with compression
const decoder = new BlockDecoder({
decode: true,
codecs: {
'null': (buf, cb) => cb(null, buf),
'deflate': zlib.inflateRaw,
'snappy': require('snappy').uncompress
}
});
// Listen for file metadata
decoder.on('metadata', (type, codec, header) => {
console.log('Schema:', type.toString());
console.log('Codec:', codec);
console.log('File metadata:', header.meta);
});
// Process records
decoder.on('data', (record) => {
console.log('Record:', record);
});
fs.createReadStream('./data.avro').pipe(decoder);
// Encode with compression
const schema = { type: 'string' };
const encoder = new BlockEncoder(schema, {
codec: 'deflate',
blockSize: 1024 * 16, // 16KB blocks
codecs: BlockEncoder.getDefaultCodecs()
});
encoder.pipe(fs.createWriteStream('./output.avro'));
encoder.write('Hello');
encoder.write('World');
encoder.end();Chain multiple streams for complex data processing workflows.
Usage Examples:
const avro = require('avro-js');
const { Transform } = require('stream');
// Custom transform stream
class DataProcessor extends Transform {
constructor(options) {
super({ objectMode: true, ...options });
}
_transform(record, encoding, callback) {
// Process the record
const processed = {
...record,
processed: true,
timestamp: Date.now()
};
callback(null, processed);
}
}
// Pipeline: File -> Decode -> Process -> Encode -> File
const inputSchema = { type: 'record', name: 'Input', fields: [/* ... */] };
const outputSchema = { type: 'record', name: 'Output', fields: [/* ... */] };
fs.createReadStream('./input.avro')
.pipe(new avro.streams.BlockDecoder())
.pipe(new DataProcessor())
.pipe(new avro.streams.BlockEncoder(outputSchema))
.pipe(fs.createWriteStream('./output.avro'));
// Error handling in pipelines
const pipeline = require('stream').pipeline;
pipeline(
fs.createReadStream('./input.avro'),
new avro.streams.BlockDecoder(),
new DataProcessor(),
new avro.streams.BlockEncoder(outputSchema),
fs.createWriteStream('./output.avro'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);Monitor stream processing with built-in events.
// BlockDecoder events
interface BlockDecoderEvents {
/** Emitted when file metadata is parsed */
'metadata': (type: Type, codec: string, header: Object) => void;
/** Emitted for each decoded record */
'data': (record: any) => void;
/** Emitted when stream ends */
'end': () => void;
/** Emitted on errors */
'error': (err: Error) => void;
}
// BlockEncoder events
interface BlockEncoderEvents {
/** Emitted when a block is written */
'block': (count: number, size: number) => void;
/** Emitted when stream is finished */
'finish': () => void;
/** Emitted on errors */
'error': (err: Error) => void;
}Usage Examples:
const decoder = new avro.streams.BlockDecoder();
decoder.on('metadata', (type, codec, header) => {
console.log(`Processing ${codec} compressed file`);
console.log(`Schema: ${type.getName()}`);
});
decoder.on('data', (record) => {
// Process each record
});
decoder.on('error', (err) => {
console.error('Decoder error:', err);
});
const encoder = new avro.streams.BlockEncoder(schema);
encoder.on('block', (count, size) => {
console.log(`Wrote block: ${count} records, ${size} bytes`);
});
encoder.on('finish', () => {
console.log('Encoding complete');
});Optimize memory usage for large datasets.
Usage Examples:
const avro = require('avro-js');
// Configure buffer sizes for memory efficiency
const decoder = new avro.streams.BlockDecoder({
decode: true,
// Don't buffer entire file in memory
highWaterMark: 1024 * 16 // 16KB chunks
});
const encoder = new avro.streams.BlockEncoder(schema, {
blockSize: 1024 * 64, // 64KB blocks
highWaterMark: 1024 * 16 // 16KB stream buffer
});
// Process large files without loading everything into memory
fs.createReadStream('./large-file.avro', { highWaterMark: 1024 * 16 })
.pipe(decoder)
.pipe(new Transform({
objectMode: true,
transform(record, encoding, callback) {
// Process one record at a time
callback(null, processRecord(record));
}
}))
.pipe(encoder)
.pipe(fs.createWriteStream('./processed-file.avro'));All stream classes are available through the streams export:
const streams = {
RawDecoder,
RawEncoder,
BlockDecoder,
BlockEncoder
};interface RawStreamOptions {
/** Whether to decode values to JavaScript objects (default: true) */
decode?: boolean;
/** Whether to batch writes for performance */
batchWrites?: boolean;
/** Stream high water mark */
highWaterMark?: number;
}
interface BlockDecoderOptions {
/** Whether to decode values to JavaScript objects (default: true) */
decode?: boolean;
/** Custom compression codecs */
codecs?: {
[codecName: string]: (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
};
/** Schema parsing options */
parseOpts?: ParseOptions;
/** Stream high water mark */
highWaterMark?: number;
}
interface BlockEncoderOptions {
/** Block size in bytes (default: 65536) */
blockSize?: number;
/** Compression codec name (default: 'null') */
codec?: string;
/** Custom compression codecs */
codecs?: {
[codecName: string]: (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
};
/** Omit file header for raw block output */
omitHeader?: boolean;
/** Custom 16-byte sync marker */
syncMarker?: Buffer;
/** Metadata to include in file header */
metadata?: { [key: string]: Buffer };
/** Stream high water mark */
highWaterMark?: number;
}
interface DefaultCodecs {
/** No compression */
'null': (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
/** Deflate compression */
'deflate': (buf: Buffer, cb: (err: Error, result: Buffer) => void) => void;
}