or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aggregation.mddeaggregation.mdindex.mdrecord-aggregator.md
tile.json

tessl/npm-aws-kinesis-agg

Node.js module for aggregating and deaggregating Amazon Kinesis records using Protocol Buffers encoding

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/aws-kinesis-agg@4.2.x

To install, run

npx @tessl/cli install tessl/npm-aws-kinesis-agg@4.2.0

index.mddocs/

AWS Kinesis Aggregation

AWS Kinesis Aggregation is a Node.js module that simplifies working with Amazon Kinesis records using the Kinesis Aggregated Record Format and Protocol Buffers encoding. It enables efficient packing of multiple user records into larger aggregated records to maximize throughput and reduce costs when working with Amazon Kinesis streams.

Package Information

  • Package Name: aws-kinesis-agg
  • Package Type: npm
  • Language: JavaScript/TypeScript
  • Installation: npm install aws-kinesis-agg

Core Imports

const agg = require('aws-kinesis-agg');

For ES modules/TypeScript:

import { deaggregateSync, deaggregate, aggregate, RecordAggregator } from 'aws-kinesis-agg';

Basic Usage

const agg = require('aws-kinesis-agg');

// Deaggregate a Kinesis record synchronously
agg.deaggregateSync(kinesisRecord, true, (err, userRecords) => {
  if (err) {
    console.error('Deaggregation failed:', err);
  } else {
    console.log('Found', userRecords.length, 'user records');
    userRecords.forEach(record => {
      console.log('Record:', record.partitionKey, record.data);
    });
  }
});

// Aggregate multiple records
const records = [
  { partitionKey: 'key1', data: Buffer.from('data1') },
  { partitionKey: 'key2', data: Buffer.from('data2') }
];

agg.aggregate(records, 
  (encodedRecord, callback) => {
    // Handle encoded record (e.g., send to Kinesis)
    console.log('Encoded record size:', encodedRecord.data.length);
    callback();
  },
  () => {
    console.log('All records processed');
  },
  (err, data) => {
    console.error('Error:', err);
  }
);

Architecture

AWS Kinesis Aggregation is built around several key components:

  • Deaggregation Engine: Extracts individual user records from KPL-aggregated Kinesis records with checksum validation
  • Aggregation Engine: Packs multiple user records into aggregated records using Protocol Buffers encoding
  • Record Aggregator Class: Provides fine-grained control over record aggregation with size management
  • Format Compatibility: Supports both AWS Lambda v2 and v3 event formats
  • Protocol Buffer Integration: Uses protobufjs for efficient encoding/decoding of the KPL aggregation format

Capabilities

Record Deaggregation

Extract individual user records from aggregated Kinesis records. Supports both synchronous and asynchronous processing with optional checksum validation.

function deaggregateSync(
  kinesisRecord: KinesisStreamRecordPayload,
  computeChecksums: boolean,
  afterRecordCallback: (err: Error, userRecords?: UserRecord[]) => void
): void;

function deaggregate(
  kinesisRecord: KinesisStreamRecordPayload,
  computeChecksums: boolean,
  perRecordCallback: (err: Error, userRecord?: UserRecord) => void,
  afterRecordCallback: (err?: Error, errorUserRecord?: UserRecord) => void
): void;

Record Deaggregation

Record Aggregation

Aggregate multiple user records into efficiently packed Kinesis records. Includes queue-based processing with configurable concurrency.

function aggregate(
  records: any[],
  encodedRecordHandler: (encodedRecord: EncodedRecord, callback: (err?: Error, data?: any) => void) => void,
  afterPutAggregatedRecords: () => void,
  errorCallback: (error: Error, data?: EncodedRecord) => void,
  queueSize?: number
): void;

Record Aggregation

Record Aggregator Class

Fine-grained control over record aggregation with size management, buffering, and manual flushing capabilities.

class RecordAggregator {
  constructor(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void);
  
  addUserRecord(record: InputUserRecord): void;
  build(): EncodedRecord | undefined;
  flushBufferedRecords(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;
  length(): number;
  checkIfUserRecordFits(record: InputUserRecord): boolean;
  calculateUserRecordSize(record: InputUserRecord): number;
  clearRecords(): void;
  setOnReadyCallback(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): Function;
  aggregateRecords(records: InputUserRecord[], forceFlush: boolean, onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;
}

Record Aggregator

Types

// Deaggregated user record (output from deaggregation functions)
interface UserRecord {
  partitionKey: string;
  explicitPartitionKey?: string;
  sequenceNumber: string;
  subSequenceNumber: number;
  data: string; // base64 encoded
}

// Input record for aggregation
interface InputUserRecord {
  partitionKey: string;
  data: Buffer | string;
  explicitHashKey?: string;
}

interface EncodedRecord {
  partitionKey: string;
  data: Buffer;
  ExplicitHashKey?: string;
}

interface KinesisStreamRecordPayload {
  partitionKey: string;
  explicitPartitionKey?: string;
  sequenceNumber: string;
  data: string;
}