High-performance streaming interfaces for processing large datasets with support for both container files and raw data streams.
Avro container format constants for low-level operations.
/** Avro container magic bytes (4 bytes: 'Obj' + 0x01) */
const MAGIC_BYTES: Buffer;
/** Type definition for container headers */
const HEADER_TYPE: Type;
/** Type definition for container blocks */
const BLOCK_TYPE: Type;Duplex stream for decoding Avro container files with block-based processing.
class BlockDecoder extends stream.Duplex {
/**
* Create block decoder stream
* @param opts - Decoder options
*/
constructor(opts);
/**
* Get default compression codecs
* @returns Object containing default codecs
*/
static defaultCodecs();
}Usage Examples:
const avsc = require('avsc');
const fs = require('fs');
// Decode container file blocks
const decoder = new avsc.streams.BlockDecoder();
fs.createReadStream('./data.avro')
.pipe(decoder)
.on('data', (record) => {
console.log('Decoded record:', record);
});
// With custom codecs
const decoder = new avsc.streams.BlockDecoder({
codecs: {
'lz4': lz4Codec,
'custom': customCodec
}
});Duplex stream for encoding Avro container files with block-based compression.
class BlockEncoder extends stream.Duplex {
/**
* Create block encoder stream
* @param schema - Avro schema for encoding
* @param opts - Encoder options
*/
constructor(schema, opts);
/**
* Get default compression codecs
* @returns Object containing default codecs
*/
static defaultCodecs();
}Usage Examples:
const avsc = require('avsc');
const fs = require('fs');
const schema = {
type: 'record',
name: 'LogEntry',
fields: [
{name: 'timestamp', type: 'long'},
{name: 'message', type: 'string'},
{name: 'level', type: {type: 'enum', name: 'Level', symbols: ['DEBUG', 'INFO', 'WARN', 'ERROR']}}
]
};
// Create encoder with compression
const encoder = new avsc.streams.BlockEncoder(schema, {
codec: 'deflate',
blockSize: 32768
});
encoder.pipe(fs.createWriteStream('./logs.avro'));
// Write log entries
encoder.write({timestamp: Date.now(), message: 'Application started', level: 'INFO'});
encoder.write({timestamp: Date.now(), message: 'Processing request', level: 'DEBUG'});
encoder.end();Duplex stream for decoding raw Avro data without container format.
class RawDecoder extends stream.Duplex {
/**
* Create raw decoder stream
* @param schema - Avro schema for decoding
* @param opts - Decoder options
*/
constructor(schema, opts);
}Usage Examples:
const avsc = require('avsc');
const schema = 'string';
const decoder = new avsc.streams.RawDecoder(schema);
decoder.on('data', (value) => {
console.log('Decoded string:', value);
});
// Decode raw bytes
decoder.write(Buffer.from([6, 102, 111, 111])); // Encodes "foo"
decoder.end();Duplex stream for encoding raw Avro data without container format.
class RawEncoder extends stream.Duplex {
/**
* Create raw encoder stream
* @param schema - Avro schema for encoding
* @param opts - Encoder options
*/
constructor(schema, opts);
}Usage Examples:
const avsc = require('avsc');
const schema = {
type: 'record',
name: 'Point',
fields: [
{name: 'x', type: 'double'},
{name: 'y', type: 'double'}
]
};
const encoder = new avsc.streams.RawEncoder(schema);
encoder.on('data', (buffer) => {
console.log('Encoded bytes:', buffer);
});
// Encode coordinates
encoder.write({x: 1.5, y: 2.7});
encoder.write({x: -0.5, y: 3.14});
encoder.end();All streaming classes emit standard Node.js stream events plus additional Avro-specific events:
interface StreamEvents {
/** Standard stream events */
'data': (chunk: any) => void;
'end': () => void;
'error': (err: Error) => void;
'close': () => void;
/** Avro-specific events (BlockDecoder only) */
'metadata': (type: Type, codec: string, header: any) => void;
}Usage Examples:
const decoder = new avsc.streams.BlockDecoder();
// Handle metadata when available
decoder.on('metadata', (type, codec, header) => {
console.log('Schema:', type.schema());
console.log('Compression:', codec);
console.log('File metadata:', header.meta);
});
// Handle records
decoder.on('data', (record) => {
console.log('Record:', record);
});
// Handle errors
decoder.on('error', (err) => {
console.error('Stream error:', err);
});interface StreamOptions {
/** Transform hook for records */
transform?: (record: any) => any;
/** Custom compression codecs */
codecs?: {[name: string]: Codec};
/** Whether to emit raw buffers instead of decoded values */
noDecode?: boolean;
}
interface BlockEncoderOptions extends StreamOptions {
/** Block size in bytes */
blockSize?: number;
/** Compression codec name */
codec?: string;
/** Whether to write container header */
writeHeader?: boolean;
/** Custom synchronization marker */
syncMarker?: Buffer;
}
interface BlockDecoderOptions extends StreamOptions {
/** Reader schema for schema evolution */
readerSchema?: Type | any;
/** Hook for custom schema parsing */
parseHook?: (schema: any) => Type;
}
type Codec = (buffer: Buffer, callback: (err: Error | null, result?: Buffer) => void) => void;Low-level streaming classes for RPC frame encoding and decoding.
class FrameDecoder extends stream.Duplex {
/**
* Create frame decoder stream
* @param opts - Decoder options
*/
constructor(opts);
}
class FrameEncoder extends stream.Duplex {
/**
* Create frame encoder stream
* @param opts - Encoder options
*/
constructor(opts);
}
class NettyDecoder extends stream.Duplex {
/**
* Create Netty-format frame decoder stream
* @param opts - Decoder options
*/
constructor(opts);
}
class NettyEncoder extends stream.Duplex {
/**
* Create Netty-format frame encoder stream
* @param opts - Encoder options
*/
constructor(opts);
}