0
# AWS Kinesis Aggregation
1
2
AWS Kinesis Aggregation is a Node.js module that simplifies working with Amazon Kinesis records using the Kinesis Aggregated Record Format and Protocol Buffers encoding. It enables efficient packing of multiple user records into larger aggregated records to maximize throughput and reduce costs when working with Amazon Kinesis streams.
3
4
## Package Information
5
6
- **Package Name**: aws-kinesis-agg
7
- **Package Type**: npm
8
- **Language**: JavaScript/TypeScript
9
- **Installation**: `npm install aws-kinesis-agg`
10
11
## Core Imports
12
13
```javascript
14
const agg = require('aws-kinesis-agg');
15
```
16
17
For ES modules/TypeScript:
18
19
```typescript
20
import { deaggregateSync, deaggregate, aggregate, RecordAggregator } from 'aws-kinesis-agg';
21
```
22
23
## Basic Usage
24
25
```javascript
26
const agg = require('aws-kinesis-agg');
27
28
// Deaggregate a Kinesis record synchronously
29
agg.deaggregateSync(kinesisRecord, true, (err, userRecords) => {
30
if (err) {
31
console.error('Deaggregation failed:', err);
32
} else {
33
console.log('Found', userRecords.length, 'user records');
34
userRecords.forEach(record => {
35
console.log('Record:', record.partitionKey, record.data);
36
});
37
}
38
});
39
40
// Aggregate multiple records
41
const records = [
42
{ partitionKey: 'key1', data: Buffer.from('data1') },
43
{ partitionKey: 'key2', data: Buffer.from('data2') }
44
];
45
46
agg.aggregate(records,
47
(encodedRecord, callback) => {
48
// Handle encoded record (e.g., send to Kinesis)
49
console.log('Encoded record size:', encodedRecord.data.length);
50
callback();
51
},
52
() => {
53
console.log('All records processed');
54
},
55
(err, data) => {
56
console.error('Error:', err);
57
}
58
);
59
```
60
61
## Architecture
62
63
AWS Kinesis Aggregation is built around several key components:
64
65
- **Deaggregation Engine**: Extracts individual user records from KPL-aggregated Kinesis records with checksum validation
66
- **Aggregation Engine**: Packs multiple user records into aggregated records using Protocol Buffers encoding
67
- **Record Aggregator Class**: Provides fine-grained control over record aggregation with size management
68
- **Format Compatibility**: Supports both AWS Lambda v2 and v3 event formats
69
- **Protocol Buffer Integration**: Uses protobufjs for efficient encoding/decoding of the KPL aggregation format
70
71
## Capabilities
72
73
### Record Deaggregation
74
75
Extract individual user records from aggregated Kinesis records. Supports both synchronous and asynchronous processing with optional checksum validation.
76
77
```javascript { .api }
78
function deaggregateSync(
79
kinesisRecord: KinesisStreamRecordPayload,
80
computeChecksums: boolean,
81
afterRecordCallback: (err: Error, userRecords?: UserRecord[]) => void
82
): void;
83
84
function deaggregate(
85
kinesisRecord: KinesisStreamRecordPayload,
86
computeChecksums: boolean,
87
perRecordCallback: (err: Error, userRecord?: UserRecord) => void,
88
afterRecordCallback: (err?: Error, errorUserRecord?: UserRecord) => void
89
): void;
90
```
91
92
[Record Deaggregation](./deaggregation.md)
93
94
### Record Aggregation
95
96
Aggregate multiple user records into efficiently packed Kinesis records. Includes queue-based processing with configurable concurrency.
97
98
```javascript { .api }
99
function aggregate(
100
records: any[],
101
encodedRecordHandler: (encodedRecord: EncodedRecord, callback: (err?: Error, data?: any) => void) => void,
102
afterPutAggregatedRecords: () => void,
103
errorCallback: (error: Error, data?: EncodedRecord) => void,
104
queueSize?: number
105
): void;
106
```
107
108
[Record Aggregation](./aggregation.md)
109
110
### Record Aggregator Class
111
112
Fine-grained control over record aggregation with size management, buffering, and manual flushing capabilities.
113
114
```javascript { .api }
115
class RecordAggregator {
116
constructor(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void);
117
118
addUserRecord(record: InputUserRecord): void;
119
build(): EncodedRecord | undefined;
120
flushBufferedRecords(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;
121
length(): number;
122
checkIfUserRecordFits(record: InputUserRecord): boolean;
123
calculateUserRecordSize(record: InputUserRecord): number;
124
clearRecords(): void;
125
setOnReadyCallback(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): Function;
126
aggregateRecords(records: InputUserRecord[], forceFlush: boolean, onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;
127
}
128
```
129
130
[Record Aggregator](./record-aggregator.md)
131
132
## Types
133
134
```typescript { .api }
135
// Deaggregated user record (output from deaggregation functions)
136
interface UserRecord {
137
partitionKey: string;
138
explicitPartitionKey?: string;
139
sequenceNumber: string;
140
subSequenceNumber: number;
141
data: string; // base64 encoded
142
}
143
144
// Input record for aggregation
145
interface InputUserRecord {
146
partitionKey: string;
147
data: Buffer | string;
148
explicitHashKey?: string;
149
}
150
151
interface EncodedRecord {
152
partitionKey: string;
153
data: Buffer;
154
ExplicitHashKey?: string;
155
}
156
157
interface KinesisStreamRecordPayload {
158
partitionKey: string;
159
explicitPartitionKey?: string;
160
sequenceNumber: string;
161
data: string;
162
}
163
164
```