Node.js module for aggregating and deaggregating Amazon Kinesis records using Protocol Buffers encoding
The RecordAggregator class provides fine-grained control over record aggregation with manual size management, buffering, and flushing capabilities. It's ideal for applications that need precise control over when aggregated records are generated and sent.
Creates a new RecordAggregator instance with an optional callback for when records are ready.
/**
* Create a new RecordAggregator instance
* @param onReadyCallback - Optional callback for when aggregated records are ready
*/
constructor(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void);Usage Example:
const { RecordAggregator } = require('aws-kinesis-agg');
// Create aggregator with callback
const aggregator = new RecordAggregator((err, encodedRecord) => {
if (err) {
console.error('Aggregation error:', err);
} else {
console.log('Record ready:', encodedRecord.partitionKey, encodedRecord.data.length);
// Send to Kinesis or other destination
}
});
// Create aggregator without callback (manual handling)
const manualAggregator = new RecordAggregator();Adds a user record to the aggregator's internal buffer. Automatically triggers the callback when the buffer reaches the size limit.
/**
* Add a user record to the aggregator
* @param record - The user record to add
* @throws Error if record won't fit or is too large
*/
addUserRecord(record: InputUserRecord): void;Usage Example:
const aggregator = new RecordAggregator((err, encodedRecord) => {
// Handle the automatically generated encoded record
sendToKinesis(encodedRecord);
});
try {
// Add records one by one
aggregator.addUserRecord({
partitionKey: 'user-123',
data: Buffer.from('user data 1')
});
aggregator.addUserRecord({
partitionKey: 'user-456',
data: Buffer.from('user data 2'),
explicitHashKey: 'shard-key-456'
});
// When the internal buffer reaches ~1MB, the callback will be triggered automatically
} catch (err) {
console.error('Failed to add record:', err.message);
// Possible errors:
// - "Record.Data field is mandatory"
// - "record.partitionKey field is mandatory"
// - "record won't fit"
// - "Input record (...) is too large to fit inside a single Kinesis record"
}Manually builds an encoded record from all buffered user records and clears the buffer.
/**
* Build an encoded record from buffered user records and clear the buffer
* @returns The encoded record, or undefined if no records are buffered
*/
build(): EncodedRecord | undefined;Usage Example:
const aggregator = new RecordAggregator();
// Add records manually
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });
aggregator.addUserRecord({ partitionKey: 'key2', data: Buffer.from('data2') });
// Manually build the encoded record
const encodedRecord = aggregator.build();
if (encodedRecord) {
console.log('Built record:', encodedRecord.partitionKey, encodedRecord.data.length);
// Send to Kinesis or other destination
sendToKinesis(encodedRecord);
}
// Buffer is now empty, ready for more records
console.log('Records in buffer:', aggregator.length()); // 0Triggers the callback with any buffered records and clears the buffer.
/**
* Flush buffered records through the callback and clear the buffer
* @param onReadyCallback - Optional callback, uses instance callback if not provided
*/
flushBufferedRecords(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;Usage Example:
const aggregator = new RecordAggregator();
// Add some records
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });
aggregator.addUserRecord({ partitionKey: 'key2', data: Buffer.from('data2') });
// Flush with custom callback
aggregator.flushBufferedRecords((err, encodedRecord) => {
if (err) {
console.error('Flush error:', err);
} else if (encodedRecord) {
console.log('Flushed record:', encodedRecord.data.length);
sendToKinesis(encodedRecord);
}
});
// Buffer is now emptyReturns the number of user records currently in the buffer.
/**
* Get the number of user records currently buffered
* @returns Number of buffered records
*/
length(): number;Checks if a user record would fit in the current buffer without adding it.
/**
* Check if a user record would fit in the current buffer
* @param record - The record to check
* @returns True if the record would fit, false otherwise
*/
checkIfUserRecordFits(record: InputUserRecord): boolean;Usage Example:
const aggregator = new RecordAggregator();
// Add some records
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });
// Check if another record would fit
const largeRecord = { partitionKey: 'key2', data: Buffer.alloc(500000) }; // 500KB
if (aggregator.checkIfUserRecordFits(largeRecord)) {
aggregator.addUserRecord(largeRecord);
console.log('Large record added');
} else {
// Flush current buffer first
aggregator.flushBufferedRecords();
// Then add the large record
aggregator.addUserRecord(largeRecord);
console.log('Buffer flushed, large record added');
}Calculates the size a user record would consume when encoded, without adding it to the buffer.
/**
* Calculate the size a user record would consume when encoded
* @param record - The record to calculate size for
* @returns Size in bytes
* @throws Error if record is invalid
*/
calculateUserRecordSize(record: InputUserRecord): number;Usage Example:
const aggregator = new RecordAggregator();
const record = { partitionKey: 'test-key', data: Buffer.from('test data') };
try {
const size = aggregator.calculateUserRecordSize(record);
console.log(`Record would consume ${size} bytes when encoded`);
// Use size information for batching decisions
const remainingCapacity = 1048576 - getCurrentBufferSize(); // 1MB limit
if (size <= remainingCapacity) {
aggregator.addUserRecord(record);
} else {
console.log('Record too large for current buffer, flushing first');
aggregator.flushBufferedRecords();
aggregator.addUserRecord(record);
}
} catch (err) {
console.error('Invalid record:', err.message);
}Resets the aggregator to empty state, discarding all buffered records.
/**
* Reset aggregator to empty state, discarding all buffered records
*/
clearRecords(): void;Updates the callback function used by automatic flushing and manual flush operations.
/**
* Set or update the onReadyCallback function
* @param onReadyCallback - The new callback function
* @returns The current callback function
*/
setOnReadyCallback(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): Function;Adds multiple records at once with optional automatic flushing.
/**
* Add multiple records to the aggregator
* @param records - Array of user records to add
* @param forceFlush - If true, flush buffered records at the end
* @param onReadyCallback - Optional callback for this operation
*/
aggregateRecords(
records: InputUserRecord[],
forceFlush: boolean,
onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void
): void;Usage Example:
const aggregator = new RecordAggregator();
const batchRecords = [
{ partitionKey: 'key1', data: Buffer.from('data1') },
{ partitionKey: 'key2', data: Buffer.from('data2') },
{ partitionKey: 'key3', data: Buffer.from('data3') }
];
// Add all records and force flush
aggregator.aggregateRecords(batchRecords, true, (err, encodedRecord) => {
if (err) {
console.error('Batch aggregation error:', err);
} else if (encodedRecord) {
console.log('Batch encoded:', encodedRecord.data.length);
sendToKinesis(encodedRecord);
}
});const { RecordAggregator } = require('aws-kinesis-agg');
const stream = require('stream');
class KinesisAggregatorStream extends stream.Transform {
constructor(options = {}) {
super({ objectMode: true });
this.aggregator = new RecordAggregator((err, encodedRecord) => {
if (err) {
this.emit('error', err);
} else {
this.push(encodedRecord);
}
});
}
_transform(record, encoding, callback) {
try {
if (this.aggregator.checkIfUserRecordFits(record)) {
this.aggregator.addUserRecord(record);
} else {
// Flush current buffer, then add new record
this.aggregator.flushBufferedRecords();
this.aggregator.addUserRecord(record);
}
callback();
} catch (err) {
callback(err);
}
}
_flush(callback) {
// Flush any remaining records
this.aggregator.flushBufferedRecords();
callback();
}
}const aggregator = new RecordAggregator();
const TARGET_SIZE = 512 * 1024; // 512KB target size
function addRecordWithSizeControl(record) {
const recordSize = aggregator.calculateUserRecordSize(record);
const currentRecords = aggregator.length();
// Estimate current buffer size (approximation)
const estimatedCurrentSize = currentRecords * 1000; // Rough estimate
if (estimatedCurrentSize + recordSize > TARGET_SIZE) {
// Flush before adding if we'd exceed target size
aggregator.flushBufferedRecords();
}
aggregator.addUserRecord(record);
}interface InputUserRecord {
/** The partition key for this user record */
partitionKey: string;
/** The data payload for this user record */
data: Buffer | string;
/** Optional explicit hash key for shard allocation */
explicitHashKey?: string;
}
interface EncodedRecord {
/** The partition key for the aggregated record */
partitionKey: string;
/** The encoded aggregated record data */
data: Buffer;
/** Optional explicit hash key for shard allocation */
ExplicitHashKey?: string;
}Install with Tessl CLI
npx tessl i tessl/npm-aws-kinesis-agg