CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-aws-kinesis-agg

Node.js module for aggregating and deaggregating Amazon Kinesis records using Protocol Buffers encoding

Overview
Eval results
Files

deaggregation.mddocs/

Record Deaggregation

Record deaggregation extracts individual user records from aggregated Kinesis records created by the Kinesis Producer Library (KPL) or other aggregation tools. The module provides both synchronous and asynchronous deaggregation methods with optional checksum validation.

Capabilities

Synchronous Deaggregation

Extracts all user records from an aggregated Kinesis record and returns them as an array.

/**
 * Synchronously deaggregate a Kinesis record into user records
 * @param kinesisRecord - The Kinesis record to deaggregate
 * @param computeChecksums - Whether to validate record checksums
 * @param afterRecordCallback - Callback with error and array of user records
 */
function deaggregateSync(
  kinesisRecord: KinesisStreamRecordPayload,
  computeChecksums: boolean,
  afterRecordCallback: (err: Error, userRecords?: UserRecord[]) => void
): void;

Usage Example:

const agg = require('aws-kinesis-agg');

// Process a Kinesis record from Lambda event
exports.handler = (event, context) => {
  event.Records.forEach(kinesisRecord => {
    agg.deaggregateSync(kinesisRecord.kinesis, true, (err, userRecords) => {
      if (err) {
        console.error('Deaggregation failed:', err);
        return;
      }
      
      console.log(`Found ${userRecords.length} user records`);
      userRecords.forEach((record, index) => {
        console.log(`Record ${index}:`, {
          partitionKey: record.partitionKey,
          sequenceNumber: record.sequenceNumber,
          subSequenceNumber: record.subSequenceNumber,
          dataLength: record.data.length
        });
        
        // Process individual user record
        const userData = Buffer.from(record.data, 'base64');
        processUserData(userData);
      });
    });
  });
};

Asynchronous Deaggregation

Processes user records one at a time through a callback, allowing for streaming processing of large aggregated records.

/**
 * Asynchronously deaggregate a Kinesis record, calling perRecordCallback for each user record
 * @param kinesisRecord - The Kinesis record to deaggregate
 * @param computeChecksums - Whether to validate record checksums
 * @param perRecordCallback - Called for each extracted user record
 * @param afterRecordCallback - Called when all records processed or on error
 */
function deaggregate(
  kinesisRecord: KinesisStreamRecordPayload,
  computeChecksums: boolean,
  perRecordCallback: (err: Error, userRecord?: UserRecord) => void,
  afterRecordCallback: (err?: Error, errorUserRecord?: UserRecord) => void
): void;

Usage Example:

const agg = require('aws-kinesis-agg');

// Process records asynchronously
agg.deaggregate(
  kinesisRecord.kinesis, 
  true,
  // Per-record callback
  (err, userRecord) => {
    if (err) {
      console.error('Error processing user record:', err);
      return;
    }
    
    // Process individual user record immediately
    console.log('Processing user record:', userRecord.partitionKey);
    const userData = Buffer.from(userRecord.data, 'base64');
    
    // Async processing (e.g., send to another service)
    processUserRecordAsync(userData);
  },
  // After-record callback
  (err, errorRecord) => {
    if (err) {
      console.error('Deaggregation completed with errors:', err);
      if (errorRecord) {
        console.error('Error record details:', errorRecord);
      }
    } else {
      console.log('Deaggregation completed successfully');
    }
  }
);

Checksum Validation

The computeChecksums parameter controls whether the module validates the MD5 checksum embedded in aggregated records.

// Enable checksum validation (recommended)
agg.deaggregateSync(record, true, callback);

// Disable checksum validation (faster but less safe)
agg.deaggregateSync(record, false, callback);

Important Notes:

  • Enabling checksum validation ensures data integrity but adds computational overhead
  • If checksum validation fails, an error is returned via the callback
  • Non-aggregated records are processed normally regardless of checksum setting

Error Handling

The deaggregation functions handle several error scenarios:

  • Invalid Checksum: When computeChecksums is true and checksum validation fails
  • Malformed Protocol Buffer: When the aggregated record format is corrupted
  • Processing Errors: When individual user record processing fails (async mode only)

For asynchronous deaggregation, errors during perRecordCallback execution will trigger the afterRecordCallback with error details and the problematic record information.

Format Compatibility

The module automatically handles both AWS Lambda event formats:

  • v2 Format: Traditional lowercase property names (data, partitionKey)
  • v3 Format: PascalCase property names (Data, PartitionKey)

Records are automatically converted between formats as needed, ensuring compatibility across different Lambda runtime versions.

Types

interface KinesisStreamRecordPayload {
  /** The partition key of the record */
  partitionKey: string;
  /** Optional explicit partition key for shard allocation */
  explicitPartitionKey?: string;
  /** The sequence number assigned by Kinesis */
  sequenceNumber: string;
  /** Base64-encoded record data */
  data: string;
}

interface UserRecord {
  /** The partition key provided when the record was submitted */
  partitionKey: string;
  /** The explicit hash key for shard allocation (if provided) */
  explicitPartitionKey?: string;
  /** The sequence number of the containing aggregated record */
  sequenceNumber: string;
  /** The position of this user record within the aggregated record */
  subSequenceNumber: number;
  /** The original data transmitted by the producer (base64 encoded) */
  data: string;
}

Install with Tessl CLI

npx tessl i tessl/npm-aws-kinesis-agg

docs

aggregation.md

deaggregation.md

index.md

record-aggregator.md

tile.json