CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-aws-kinesis-agg

Node.js module for aggregating and deaggregating Amazon Kinesis records using Protocol Buffers encoding

Overview
Eval results
Files

record-aggregator.mddocs/

Record Aggregator Class

The RecordAggregator class provides fine-grained control over record aggregation with manual size management, buffering, and flushing capabilities. It's ideal for applications that need precise control over when aggregated records are generated and sent.

Capabilities

Constructor

Creates a new RecordAggregator instance with an optional callback for when records are ready.

/**
 * Create a new RecordAggregator instance
 * @param onReadyCallback - Optional callback for when aggregated records are ready
 */
constructor(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void);

Usage Example:

const { RecordAggregator } = require('aws-kinesis-agg');

// Create aggregator with callback
const aggregator = new RecordAggregator((err, encodedRecord) => {
  if (err) {
    console.error('Aggregation error:', err);
  } else {
    console.log('Record ready:', encodedRecord.partitionKey, encodedRecord.data.length);
    // Send to Kinesis or other destination
  }
});

// Create aggregator without callback (manual handling)
const manualAggregator = new RecordAggregator();

Add User Record

Adds a user record to the aggregator's internal buffer. Automatically triggers the callback when the buffer reaches the size limit.

/**
 * Add a user record to the aggregator
 * @param record - The user record to add
 * @throws Error if record won't fit or is too large
 */
addUserRecord(record: InputUserRecord): void;

Usage Example:

const aggregator = new RecordAggregator((err, encodedRecord) => {
  // Handle the automatically generated encoded record
  sendToKinesis(encodedRecord);
});

try {
  // Add records one by one
  aggregator.addUserRecord({
    partitionKey: 'user-123',
    data: Buffer.from('user data 1')
  });
  
  aggregator.addUserRecord({
    partitionKey: 'user-456', 
    data: Buffer.from('user data 2'),
    explicitHashKey: 'shard-key-456'
  });
  
  // When the internal buffer reaches ~1MB, the callback will be triggered automatically
  
} catch (err) {
  console.error('Failed to add record:', err.message);
  // Possible errors:
  // - "Record.Data field is mandatory"
  // - "record.partitionKey field is mandatory" 
  // - "record won't fit"
  // - "Input record (...) is too large to fit inside a single Kinesis record"
}

Build Encoded Record

Manually builds an encoded record from all buffered user records and clears the buffer.

/**
 * Build an encoded record from buffered user records and clear the buffer
 * @returns The encoded record, or undefined if no records are buffered
 */
build(): EncodedRecord | undefined;

Usage Example:

const aggregator = new RecordAggregator();

// Add records manually
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });
aggregator.addUserRecord({ partitionKey: 'key2', data: Buffer.from('data2') });

// Manually build the encoded record
const encodedRecord = aggregator.build();
if (encodedRecord) {
  console.log('Built record:', encodedRecord.partitionKey, encodedRecord.data.length);
  // Send to Kinesis or other destination
  sendToKinesis(encodedRecord);
}

// Buffer is now empty, ready for more records
console.log('Records in buffer:', aggregator.length()); // 0

Flush Buffered Records

Triggers the callback with any buffered records and clears the buffer.

/**
 * Flush buffered records through the callback and clear the buffer
 * @param onReadyCallback - Optional callback, uses instance callback if not provided
 */
flushBufferedRecords(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;

Usage Example:

const aggregator = new RecordAggregator();

// Add some records
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });
aggregator.addUserRecord({ partitionKey: 'key2', data: Buffer.from('data2') });

// Flush with custom callback
aggregator.flushBufferedRecords((err, encodedRecord) => {
  if (err) {
    console.error('Flush error:', err);
  } else if (encodedRecord) {
    console.log('Flushed record:', encodedRecord.data.length);
    sendToKinesis(encodedRecord);
  }
});

// Buffer is now empty

Record Count

Returns the number of user records currently in the buffer.

/**
 * Get the number of user records currently buffered
 * @returns Number of buffered records
 */
length(): number;

Check Record Fit

Checks if a user record would fit in the current buffer without adding it.

/**
 * Check if a user record would fit in the current buffer
 * @param record - The record to check
 * @returns True if the record would fit, false otherwise
 */
checkIfUserRecordFits(record: InputUserRecord): boolean;

Usage Example:

const aggregator = new RecordAggregator();

// Add some records
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });

// Check if another record would fit
const largeRecord = { partitionKey: 'key2', data: Buffer.alloc(500000) }; // 500KB

if (aggregator.checkIfUserRecordFits(largeRecord)) {
  aggregator.addUserRecord(largeRecord);
  console.log('Large record added');
} else {
  // Flush current buffer first
  aggregator.flushBufferedRecords();
  // Then add the large record
  aggregator.addUserRecord(largeRecord);
  console.log('Buffer flushed, large record added');
}

Calculate Record Size

Calculates the size a user record would consume when encoded, without adding it to the buffer.

/**
 * Calculate the size a user record would consume when encoded
 * @param record - The record to calculate size for
 * @returns Size in bytes
 * @throws Error if record is invalid
 */
calculateUserRecordSize(record: InputUserRecord): number;

Usage Example:

const aggregator = new RecordAggregator();

const record = { partitionKey: 'test-key', data: Buffer.from('test data') };

try {
  const size = aggregator.calculateUserRecordSize(record);
  console.log(`Record would consume ${size} bytes when encoded`);
  
  // Use size information for batching decisions
  const remainingCapacity = 1048576 - getCurrentBufferSize(); // 1MB limit
  if (size <= remainingCapacity) {
    aggregator.addUserRecord(record);
  } else {
    console.log('Record too large for current buffer, flushing first');
    aggregator.flushBufferedRecords();
    aggregator.addUserRecord(record);
  }
} catch (err) {
  console.error('Invalid record:', err.message);
}

Clear Records

Resets the aggregator to empty state, discarding all buffered records.

/**
 * Reset aggregator to empty state, discarding all buffered records
 */
clearRecords(): void;

Set Callback

Updates the callback function used by automatic flushing and manual flush operations.

/**
 * Set or update the onReadyCallback function
 * @param onReadyCallback - The new callback function
 * @returns The current callback function
 */
setOnReadyCallback(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): Function;

Aggregate Records (Batch)

Adds multiple records at once with optional automatic flushing.

/**
 * Add multiple records to the aggregator
 * @param records - Array of user records to add
 * @param forceFlush - If true, flush buffered records at the end
 * @param onReadyCallback - Optional callback for this operation
 */
aggregateRecords(
  records: InputUserRecord[],
  forceFlush: boolean,
  onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void
): void;

Usage Example:

const aggregator = new RecordAggregator();

const batchRecords = [
  { partitionKey: 'key1', data: Buffer.from('data1') },
  { partitionKey: 'key2', data: Buffer.from('data2') },
  { partitionKey: 'key3', data: Buffer.from('data3') }
];

// Add all records and force flush
aggregator.aggregateRecords(batchRecords, true, (err, encodedRecord) => {
  if (err) {
    console.error('Batch aggregation error:', err);
  } else if (encodedRecord) {
    console.log('Batch encoded:', encodedRecord.data.length);
    sendToKinesis(encodedRecord);
  }
});

Advanced Usage Patterns

Stream Processing

const { RecordAggregator } = require('aws-kinesis-agg');
const stream = require('stream');

class KinesisAggregatorStream extends stream.Transform {
  constructor(options = {}) {
    super({ objectMode: true });
    
    this.aggregator = new RecordAggregator((err, encodedRecord) => {
      if (err) {
        this.emit('error', err);
      } else {
        this.push(encodedRecord);
      }
    });
  }
  
  _transform(record, encoding, callback) {
    try {
      if (this.aggregator.checkIfUserRecordFits(record)) {
        this.aggregator.addUserRecord(record);
      } else {
        // Flush current buffer, then add new record
        this.aggregator.flushBufferedRecords();
        this.aggregator.addUserRecord(record);
      }
      callback();
    } catch (err) {
      callback(err);
    }
  }
  
  _flush(callback) {
    // Flush any remaining records
    this.aggregator.flushBufferedRecords();
    callback();
  }
}

Size-Based Batching

const aggregator = new RecordAggregator();
const TARGET_SIZE = 512 * 1024; // 512KB target size

function addRecordWithSizeControl(record) {
  const recordSize = aggregator.calculateUserRecordSize(record);
  const currentRecords = aggregator.length();
  
  // Estimate current buffer size (approximation)
  const estimatedCurrentSize = currentRecords * 1000; // Rough estimate
  
  if (estimatedCurrentSize + recordSize > TARGET_SIZE) {
    // Flush before adding if we'd exceed target size
    aggregator.flushBufferedRecords();
  }
  
  aggregator.addUserRecord(record);
}

Types

interface InputUserRecord {
  /** The partition key for this user record */
  partitionKey: string;
  /** The data payload for this user record */
  data: Buffer | string;
  /** Optional explicit hash key for shard allocation */
  explicitHashKey?: string;
}

interface EncodedRecord {
  /** The partition key for the aggregated record */
  partitionKey: string;
  /** The encoded aggregated record data */
  data: Buffer;
  /** Optional explicit hash key for shard allocation */
  ExplicitHashKey?: string;
}

Install with Tessl CLI

npx tessl i tessl/npm-aws-kinesis-agg

docs

aggregation.md

deaggregation.md

index.md

record-aggregator.md

tile.json