Node.js module for aggregating and deaggregating Amazon Kinesis records using Protocol Buffers encoding
Record deaggregation extracts individual user records from aggregated Kinesis records created by the Kinesis Producer Library (KPL) or other aggregation tools. The module provides both synchronous and asynchronous deaggregation methods with optional checksum validation.
Extracts all user records from an aggregated Kinesis record and returns them as an array.
/**
* Synchronously deaggregate a Kinesis record into user records
* @param kinesisRecord - The Kinesis record to deaggregate
* @param computeChecksums - Whether to validate record checksums
* @param afterRecordCallback - Callback with error and array of user records
*/
function deaggregateSync(
kinesisRecord: KinesisStreamRecordPayload,
computeChecksums: boolean,
afterRecordCallback: (err: Error, userRecords?: UserRecord[]) => void
): void;Usage Example:
const agg = require('aws-kinesis-agg');
// Process a Kinesis record from Lambda event
exports.handler = (event, context) => {
event.Records.forEach(kinesisRecord => {
agg.deaggregateSync(kinesisRecord.kinesis, true, (err, userRecords) => {
if (err) {
console.error('Deaggregation failed:', err);
return;
}
console.log(`Found ${userRecords.length} user records`);
userRecords.forEach((record, index) => {
console.log(`Record ${index}:`, {
partitionKey: record.partitionKey,
sequenceNumber: record.sequenceNumber,
subSequenceNumber: record.subSequenceNumber,
dataLength: record.data.length
});
// Process individual user record
const userData = Buffer.from(record.data, 'base64');
processUserData(userData);
});
});
});
};Processes user records one at a time through a callback, allowing for streaming processing of large aggregated records.
/**
* Asynchronously deaggregate a Kinesis record, calling perRecordCallback for each user record
* @param kinesisRecord - The Kinesis record to deaggregate
* @param computeChecksums - Whether to validate record checksums
* @param perRecordCallback - Called for each extracted user record
* @param afterRecordCallback - Called when all records processed or on error
*/
function deaggregate(
kinesisRecord: KinesisStreamRecordPayload,
computeChecksums: boolean,
perRecordCallback: (err: Error, userRecord?: UserRecord) => void,
afterRecordCallback: (err?: Error, errorUserRecord?: UserRecord) => void
): void;Usage Example:
const agg = require('aws-kinesis-agg');
// Process records asynchronously
agg.deaggregate(
kinesisRecord.kinesis,
true,
// Per-record callback
(err, userRecord) => {
if (err) {
console.error('Error processing user record:', err);
return;
}
// Process individual user record immediately
console.log('Processing user record:', userRecord.partitionKey);
const userData = Buffer.from(userRecord.data, 'base64');
// Async processing (e.g., send to another service)
processUserRecordAsync(userData);
},
// After-record callback
(err, errorRecord) => {
if (err) {
console.error('Deaggregation completed with errors:', err);
if (errorRecord) {
console.error('Error record details:', errorRecord);
}
} else {
console.log('Deaggregation completed successfully');
}
}
);The computeChecksums parameter controls whether the module validates the MD5 checksum embedded in aggregated records.
// Enable checksum validation (recommended)
agg.deaggregateSync(record, true, callback);
// Disable checksum validation (faster but less safe)
agg.deaggregateSync(record, false, callback);Important Notes:
The deaggregation functions handle several error scenarios:
computeChecksums is true and checksum validation failsFor asynchronous deaggregation, errors during perRecordCallback execution will trigger the afterRecordCallback with error details and the problematic record information.
The module automatically handles both AWS Lambda event formats:
data, partitionKey)Data, PartitionKey)Records are automatically converted between formats as needed, ensuring compatibility across different Lambda runtime versions.
interface KinesisStreamRecordPayload {
/** The partition key of the record */
partitionKey: string;
/** Optional explicit partition key for shard allocation */
explicitPartitionKey?: string;
/** The sequence number assigned by Kinesis */
sequenceNumber: string;
/** Base64-encoded record data */
data: string;
}
interface UserRecord {
/** The partition key provided when the record was submitted */
partitionKey: string;
/** The explicit hash key for shard allocation (if provided) */
explicitPartitionKey?: string;
/** The sequence number of the containing aggregated record */
sequenceNumber: string;
/** The position of this user record within the aggregated record */
subSequenceNumber: number;
/** The original data transmitted by the producer (base64 encoded) */
data: string;
}Install with Tessl CLI
npx tessl i tessl/npm-aws-kinesis-agg