CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-aws-kinesis-agg

Node.js module for aggregating and deaggregating Amazon Kinesis records using Protocol Buffers encoding

Overview
Eval results
Files

aggregation.mddocs/

Record Aggregation

Record aggregation combines multiple user records into efficiently packed Kinesis records using the Kinesis Aggregated Record Format. This reduces the number of Kinesis API calls and maximizes throughput while staying within the 1MB record size limit.

Capabilities

Aggregate Function

Aggregates multiple user records into encoded records ready for Kinesis transmission. Uses a queue-based approach with configurable concurrency for handling encoded records.

/**
 * Aggregate multiple user records into encoded Kinesis records
 * @param records - Array of user records to aggregate
 * @param encodedRecordHandler - Function to handle each encoded record
 * @param afterPutAggregatedRecords - Called when all records are processed
 * @param errorCallback - Called when errors occur during processing
 * @param queueSize - Optional concurrency limit for encoded record processing (default: 1)
 */
function aggregate(
  records: any[],
  encodedRecordHandler: (encodedRecord: EncodedRecord, callback: (err?: Error, data?: any) => void) => void,
  afterPutAggregatedRecords: () => void,
  errorCallback: (error: Error, data?: EncodedRecord) => void,
  queueSize?: number
): void;

Usage Example:

const agg = require('aws-kinesis-agg');
const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis();

// Prepare user records
const userRecords = [
  { partitionKey: 'user-123', data: Buffer.from(JSON.stringify({ event: 'login', userId: '123' })) },
  { partitionKey: 'user-456', data: Buffer.from(JSON.stringify({ event: 'logout', userId: '456' })) },
  { partitionKey: 'user-789', data: Buffer.from(JSON.stringify({ event: 'purchase', userId: '789' })) }
];

// Aggregate and send to Kinesis
agg.aggregate(
  userRecords,
  // Encoded record handler - called for each aggregated record
  (encodedRecord, callback) => {
    const params = {
      Data: encodedRecord.data,
      PartitionKey: encodedRecord.partitionKey,
      StreamName: 'my-kinesis-stream'
    };
    
    // Add explicit hash key if present
    if (encodedRecord.ExplicitHashKey) {
      params.ExplicitHashKey = encodedRecord.ExplicitHashKey;
    }
    
    // Send to Kinesis
    kinesis.putRecord(params, (err, result) => {
      if (err) {
        console.error('Failed to put record:', err);
      } else {
        console.log('Successfully put record:', result.SequenceNumber);
      }
      callback(err, result);
    });
  },
  // After all records processed
  () => {
    console.log('All aggregated records have been sent to Kinesis');
  },
  // Error handler
  (err, data) => {
    console.error('Aggregation error:', err);
    if (data) {
      console.error('Failed record:', data);
    }
  },
  // Optional: concurrency for encoded record processing
  3
);

AWS Lambda Integration

Common pattern for AWS Lambda functions that process events and re-emit aggregated records:

const agg = require('aws-kinesis-agg');
const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis();

exports.handler = async (event, context) => {
  // Process incoming events and create user records
  const userRecords = event.Records.map(record => {
    // Transform the incoming record
    const processedData = processIncomingRecord(record);
    
    return {
      partitionKey: processedData.userId,
      data: Buffer.from(JSON.stringify(processedData)),
      ExplicitHashKey: processedData.shardKey // Optional
    };
  });
  
  return new Promise((resolve, reject) => {
    agg.aggregate(
      userRecords,
      (encodedRecord, callback) => {
        // Send aggregated record to output stream
        kinesis.putRecord({
          Data: encodedRecord.data,
          PartitionKey: encodedRecord.partitionKey,
          StreamName: process.env.OUTPUT_STREAM_NAME,
          ExplicitHashKey: encodedRecord.ExplicitHashKey
        }, callback);
      },
      () => {
        console.log(`Successfully aggregated and sent ${userRecords.length} records`);
        resolve({ statusCode: 200 });
      },
      (err, data) => {
        console.error('Aggregation failed:', err);
        reject(err);
      }
    );
  });
};

Record Validation

The aggregation function validates input records and provides detailed error messages:

// Records must have required fields
const validRecord = {
  partitionKey: 'required-key',  // Required
  data: Buffer.from('data'),     // Required
  ExplicitHashKey: 'optional'    // Optional
};

// These will cause errors:
const invalidRecords = [
  { data: Buffer.from('data') },              // Missing partitionKey
  { partitionKey: 'key' },                    // Missing data
  { partitionKey: 'key', data: null }         // Invalid data
];

Size Management

The aggregation function automatically manages record sizes to stay within Kinesis limits:

  • Maximum Record Size: 1MB (1,048,576 bytes) minus overhead for magic number and checksum
  • Automatic Batching: Records are automatically grouped into optimally-sized batches
  • Oversized Record Detection: Records too large to fit in a single Kinesis record generate errors
// The function will automatically:
// 1. Calculate the size of each user record when encoded
// 2. Group records into batches that fit within 1MB limit
// 3. Generate encoded records when batches are full
// 4. Handle the final partial batch when all records are processed

Concurrency Control

The optional queueSize parameter controls how many encoded records are processed concurrently:

// Process encoded records one at a time (default)
agg.aggregate(records, handler, afterCallback, errorCallback, 1);

// Process up to 5 encoded records concurrently
agg.aggregate(records, handler, afterCallback, errorCallback, 5);

// Higher concurrency can improve throughput but may increase resource usage
// and the chance of throttling from downstream services like Kinesis

Types

interface EncodedRecord {
  /** The partition key for the aggregated record */
  partitionKey: string;
  /** The encoded aggregated record data (Protocol Buffer + magic number + checksum) */
  data: Buffer;
  /** Optional explicit hash key for shard allocation */
  ExplicitHashKey?: string;
}

interface InputUserRecord {
  /** The partition key for this user record */
  partitionKey: string;
  /** The data payload for this user record */
  data: Buffer | string;
  /** Optional explicit hash key for shard allocation */
  explicitHashKey?: string;
}

Install with Tessl CLI

npx tessl i tessl/npm-aws-kinesis-agg

docs

aggregation.md

deaggregation.md

index.md

record-aggregator.md

tile.json