or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation.mddeaggregation.mdindex.mdrecord-aggregator.md

index.mddocs/

0

# AWS Kinesis Aggregation

1

2

AWS Kinesis Aggregation is a Node.js module that simplifies working with Amazon Kinesis records using the Kinesis Aggregated Record Format and Protocol Buffers encoding. It enables efficient packing of multiple user records into larger aggregated records to maximize throughput and reduce costs when working with Amazon Kinesis streams.

3

4

## Package Information

5

6

- **Package Name**: aws-kinesis-agg

7

- **Package Type**: npm

8

- **Language**: JavaScript/TypeScript

9

- **Installation**: `npm install aws-kinesis-agg`

10

11

## Core Imports

12

13

```javascript

14

const agg = require('aws-kinesis-agg');

15

```

16

17

For ES modules/TypeScript:

18

19

```typescript

20

import { deaggregateSync, deaggregate, aggregate, RecordAggregator } from 'aws-kinesis-agg';

21

```

22

23

## Basic Usage

24

25

```javascript

26

const agg = require('aws-kinesis-agg');

27

28

// Deaggregate a Kinesis record synchronously

29

agg.deaggregateSync(kinesisRecord, true, (err, userRecords) => {

30

if (err) {

31

console.error('Deaggregation failed:', err);

32

} else {

33

console.log('Found', userRecords.length, 'user records');

34

userRecords.forEach(record => {

35

console.log('Record:', record.partitionKey, record.data);

36

});

37

}

38

});

39

40

// Aggregate multiple records

41

const records = [

42

{ partitionKey: 'key1', data: Buffer.from('data1') },

43

{ partitionKey: 'key2', data: Buffer.from('data2') }

44

];

45

46

agg.aggregate(records,

47

(encodedRecord, callback) => {

48

// Handle encoded record (e.g., send to Kinesis)

49

console.log('Encoded record size:', encodedRecord.data.length);

50

callback();

51

},

52

() => {

53

console.log('All records processed');

54

},

55

(err, data) => {

56

console.error('Error:', err);

57

}

58

);

59

```

60

61

## Architecture

62

63

AWS Kinesis Aggregation is built around several key components:

64

65

- **Deaggregation Engine**: Extracts individual user records from KPL-aggregated Kinesis records with checksum validation

66

- **Aggregation Engine**: Packs multiple user records into aggregated records using Protocol Buffers encoding

67

- **Record Aggregator Class**: Provides fine-grained control over record aggregation with size management

68

- **Format Compatibility**: Supports both AWS Lambda v2 and v3 event formats

69

- **Protocol Buffer Integration**: Uses protobufjs for efficient encoding/decoding of the KPL aggregation format

70

71

## Capabilities

72

73

### Record Deaggregation

74

75

Extract individual user records from aggregated Kinesis records. Supports both synchronous and asynchronous processing with optional checksum validation.

76

77

```javascript { .api }

78

function deaggregateSync(

79

kinesisRecord: KinesisStreamRecordPayload,

80

computeChecksums: boolean,

81

afterRecordCallback: (err: Error, userRecords?: UserRecord[]) => void

82

): void;

83

84

function deaggregate(

85

kinesisRecord: KinesisStreamRecordPayload,

86

computeChecksums: boolean,

87

perRecordCallback: (err: Error, userRecord?: UserRecord) => void,

88

afterRecordCallback: (err?: Error, errorUserRecord?: UserRecord) => void

89

): void;

90

```

91

92

[Record Deaggregation](./deaggregation.md)

93

94

### Record Aggregation

95

96

Aggregate multiple user records into efficiently packed Kinesis records. Includes queue-based processing with configurable concurrency.

97

98

```javascript { .api }

99

function aggregate(

100

records: any[],

101

encodedRecordHandler: (encodedRecord: EncodedRecord, callback: (err?: Error, data?: any) => void) => void,

102

afterPutAggregatedRecords: () => void,

103

errorCallback: (error: Error, data?: EncodedRecord) => void,

104

queueSize?: number

105

): void;

106

```

107

108

[Record Aggregation](./aggregation.md)

109

110

### Record Aggregator Class

111

112

Fine-grained control over record aggregation with size management, buffering, and manual flushing capabilities.

113

114

```javascript { .api }

115

class RecordAggregator {

116

constructor(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void);

117

118

addUserRecord(record: InputUserRecord): void;

119

build(): EncodedRecord | undefined;

120

flushBufferedRecords(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;

121

length(): number;

122

checkIfUserRecordFits(record: InputUserRecord): boolean;

123

calculateUserRecordSize(record: InputUserRecord): number;

124

clearRecords(): void;

125

setOnReadyCallback(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): Function;

126

aggregateRecords(records: InputUserRecord[], forceFlush: boolean, onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;

127

}

128

```

129

130

[Record Aggregator](./record-aggregator.md)

131

132

## Types

133

134

```typescript { .api }

135

// Deaggregated user record (output from deaggregation functions)

136

interface UserRecord {

137

partitionKey: string;

138

explicitPartitionKey?: string;

139

sequenceNumber: string;

140

subSequenceNumber: number;

141

data: string; // base64 encoded

142

}

143

144

// Input record for aggregation

145

interface InputUserRecord {

146

partitionKey: string;

147

data: Buffer | string;

148

explicitHashKey?: string;

149

}

150

151

interface EncodedRecord {

152

partitionKey: string;

153

data: Buffer;

154

ExplicitHashKey?: string;

155

}

156

157

interface KinesisStreamRecordPayload {

158

partitionKey: string;

159

explicitPartitionKey?: string;

160

sequenceNumber: string;

161

data: string;

162

}

163

164

```