Node.js module for aggregating and deaggregating Amazon Kinesis records using Protocol Buffers encoding
npx @tessl/cli install tessl/npm-aws-kinesis-agg@4.2.0AWS Kinesis Aggregation is a Node.js module that simplifies working with Amazon Kinesis records using the Kinesis Aggregated Record Format and Protocol Buffers encoding. It enables efficient packing of multiple user records into larger aggregated records to maximize throughput and reduce costs when working with Amazon Kinesis streams.
npm install aws-kinesis-aggconst agg = require('aws-kinesis-agg');For ES modules/TypeScript:
import { deaggregateSync, deaggregate, aggregate, RecordAggregator } from 'aws-kinesis-agg';const agg = require('aws-kinesis-agg');
// Deaggregate a Kinesis record synchronously
agg.deaggregateSync(kinesisRecord, true, (err, userRecords) => {
if (err) {
console.error('Deaggregation failed:', err);
} else {
console.log('Found', userRecords.length, 'user records');
userRecords.forEach(record => {
console.log('Record:', record.partitionKey, record.data);
});
}
});
// Aggregate multiple records
const records = [
{ partitionKey: 'key1', data: Buffer.from('data1') },
{ partitionKey: 'key2', data: Buffer.from('data2') }
];
agg.aggregate(records,
(encodedRecord, callback) => {
// Handle encoded record (e.g., send to Kinesis)
console.log('Encoded record size:', encodedRecord.data.length);
callback();
},
() => {
console.log('All records processed');
},
(err, data) => {
console.error('Error:', err);
}
);AWS Kinesis Aggregation is built around several key components:
Extract individual user records from aggregated Kinesis records. Supports both synchronous and asynchronous processing with optional checksum validation.
function deaggregateSync(
kinesisRecord: KinesisStreamRecordPayload,
computeChecksums: boolean,
afterRecordCallback: (err: Error, userRecords?: UserRecord[]) => void
): void;
function deaggregate(
kinesisRecord: KinesisStreamRecordPayload,
computeChecksums: boolean,
perRecordCallback: (err: Error, userRecord?: UserRecord) => void,
afterRecordCallback: (err?: Error, errorUserRecord?: UserRecord) => void
): void;Aggregate multiple user records into efficiently packed Kinesis records. Includes queue-based processing with configurable concurrency.
function aggregate(
records: any[],
encodedRecordHandler: (encodedRecord: EncodedRecord, callback: (err?: Error, data?: any) => void) => void,
afterPutAggregatedRecords: () => void,
errorCallback: (error: Error, data?: EncodedRecord) => void,
queueSize?: number
): void;Fine-grained control over record aggregation with size management, buffering, and manual flushing capabilities.
class RecordAggregator {
constructor(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void);
addUserRecord(record: InputUserRecord): void;
build(): EncodedRecord | undefined;
flushBufferedRecords(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;
length(): number;
checkIfUserRecordFits(record: InputUserRecord): boolean;
calculateUserRecordSize(record: InputUserRecord): number;
clearRecords(): void;
setOnReadyCallback(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): Function;
aggregateRecords(records: InputUserRecord[], forceFlush: boolean, onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;
}// Deaggregated user record (output from deaggregation functions)
interface UserRecord {
partitionKey: string;
explicitPartitionKey?: string;
sequenceNumber: string;
subSequenceNumber: number;
data: string; // base64 encoded
}
// Input record for aggregation
interface InputUserRecord {
partitionKey: string;
data: Buffer | string;
explicitHashKey?: string;
}
interface EncodedRecord {
partitionKey: string;
data: Buffer;
ExplicitHashKey?: string;
}
interface KinesisStreamRecordPayload {
partitionKey: string;
explicitPartitionKey?: string;
sequenceNumber: string;
data: string;
}