0
# Record Aggregation
1
2
Record aggregation combines multiple user records into efficiently packed Kinesis records using the Kinesis Aggregated Record Format. This reduces the number of Kinesis API calls and maximizes throughput while staying within the 1MB record size limit.
3
4
## Capabilities
5
6
### Aggregate Function
7
8
Aggregates multiple user records into encoded records ready for Kinesis transmission. Uses a queue-based approach with configurable concurrency for handling encoded records.
9
10
```javascript { .api }
11
/**
12
* Aggregate multiple user records into encoded Kinesis records
13
* @param records - Array of user records to aggregate
14
* @param encodedRecordHandler - Function to handle each encoded record
15
* @param afterPutAggregatedRecords - Called when all records are processed
16
* @param errorCallback - Called when errors occur during processing
17
* @param queueSize - Optional concurrency limit for encoded record processing (default: 1)
18
*/
19
function aggregate(
20
records: any[],
21
encodedRecordHandler: (encodedRecord: EncodedRecord, callback: (err?: Error, data?: any) => void) => void,
22
afterPutAggregatedRecords: () => void,
23
errorCallback: (error: Error, data?: EncodedRecord) => void,
24
queueSize?: number
25
): void;
26
```
27
28
**Usage Example:**
29
30
```javascript
31
const agg = require('aws-kinesis-agg');
32
const AWS = require('aws-sdk');
33
const kinesis = new AWS.Kinesis();
34
35
// Prepare user records
36
const userRecords = [
37
{ partitionKey: 'user-123', data: Buffer.from(JSON.stringify({ event: 'login', userId: '123' })) },
38
{ partitionKey: 'user-456', data: Buffer.from(JSON.stringify({ event: 'logout', userId: '456' })) },
39
{ partitionKey: 'user-789', data: Buffer.from(JSON.stringify({ event: 'purchase', userId: '789' })) }
40
];
41
42
// Aggregate and send to Kinesis
43
agg.aggregate(
44
userRecords,
45
// Encoded record handler - called for each aggregated record
46
(encodedRecord, callback) => {
47
const params = {
48
Data: encodedRecord.data,
49
PartitionKey: encodedRecord.partitionKey,
50
StreamName: 'my-kinesis-stream'
51
};
52
53
// Add explicit hash key if present
54
if (encodedRecord.ExplicitHashKey) {
55
params.ExplicitHashKey = encodedRecord.ExplicitHashKey;
56
}
57
58
// Send to Kinesis
59
kinesis.putRecord(params, (err, result) => {
60
if (err) {
61
console.error('Failed to put record:', err);
62
} else {
63
console.log('Successfully put record:', result.SequenceNumber);
64
}
65
callback(err, result);
66
});
67
},
68
// After all records processed
69
() => {
70
console.log('All aggregated records have been sent to Kinesis');
71
},
72
// Error handler
73
(err, data) => {
74
console.error('Aggregation error:', err);
75
if (data) {
76
console.error('Failed record:', data);
77
}
78
},
79
// Optional: concurrency for encoded record processing
80
3
81
);
82
```
83
84
### AWS Lambda Integration
85
86
Common pattern for AWS Lambda functions that process events and re-emit aggregated records:
87
88
```javascript
89
const agg = require('aws-kinesis-agg');
90
const AWS = require('aws-sdk');
91
const kinesis = new AWS.Kinesis();
92
93
exports.handler = async (event, context) => {
94
// Process incoming events and create user records
95
const userRecords = event.Records.map(record => {
96
// Transform the incoming record
97
const processedData = processIncomingRecord(record);
98
99
return {
100
partitionKey: processedData.userId,
101
data: Buffer.from(JSON.stringify(processedData)),
102
ExplicitHashKey: processedData.shardKey // Optional
103
};
104
});
105
106
return new Promise((resolve, reject) => {
107
agg.aggregate(
108
userRecords,
109
(encodedRecord, callback) => {
110
// Send aggregated record to output stream
111
kinesis.putRecord({
112
Data: encodedRecord.data,
113
PartitionKey: encodedRecord.partitionKey,
114
StreamName: process.env.OUTPUT_STREAM_NAME,
115
ExplicitHashKey: encodedRecord.ExplicitHashKey
116
}, callback);
117
},
118
() => {
119
console.log(`Successfully aggregated and sent ${userRecords.length} records`);
120
resolve({ statusCode: 200 });
121
},
122
(err, data) => {
123
console.error('Aggregation failed:', err);
124
reject(err);
125
}
126
);
127
});
128
};
129
```
130
131
### Record Validation
132
133
The aggregation function validates input records and provides detailed error messages:
134
135
```javascript
136
// Records must have required fields
137
const validRecord = {
138
partitionKey: 'required-key', // Required
139
data: Buffer.from('data'), // Required
140
ExplicitHashKey: 'optional' // Optional
141
};
142
143
// These will cause errors:
144
const invalidRecords = [
145
{ data: Buffer.from('data') }, // Missing partitionKey
146
{ partitionKey: 'key' }, // Missing data
147
{ partitionKey: 'key', data: null } // Invalid data
148
];
149
```
150
151
### Size Management
152
153
The aggregation function automatically manages record sizes to stay within Kinesis limits:
154
155
- **Maximum Record Size**: 1MB (1,048,576 bytes) minus overhead for magic number and checksum
156
- **Automatic Batching**: Records are automatically grouped into optimally-sized batches
157
- **Oversized Record Detection**: Records too large to fit in a single Kinesis record generate errors
158
159
```javascript
160
// The function will automatically:
161
// 1. Calculate the size of each user record when encoded
162
// 2. Group records into batches that fit within 1MB limit
163
// 3. Generate encoded records when batches are full
164
// 4. Handle the final partial batch when all records are processed
165
```
166
167
### Concurrency Control
168
169
The optional `queueSize` parameter controls how many encoded records are processed concurrently:
170
171
```javascript
172
// Process encoded records one at a time (default)
173
agg.aggregate(records, handler, afterCallback, errorCallback, 1);
174
175
// Process up to 5 encoded records concurrently
176
agg.aggregate(records, handler, afterCallback, errorCallback, 5);
177
178
// Higher concurrency can improve throughput but may increase resource usage
179
// and the chance of throttling from downstream services like Kinesis
180
```
181
182
## Types
183
184
```typescript { .api }
185
interface EncodedRecord {
186
/** The partition key for the aggregated record */
187
partitionKey: string;
188
/** The encoded aggregated record data (Protocol Buffer + magic number + checksum) */
189
data: Buffer;
190
/** Optional explicit hash key for shard allocation */
191
ExplicitHashKey?: string;
192
}
193
194
interface InputUserRecord {
195
/** The partition key for this user record */
196
partitionKey: string;
197
/** The data payload for this user record */
198
data: Buffer | string;
199
/** Optional explicit hash key for shard allocation */
200
explicitHashKey?: string;
201
}
202
```