0
# Record Aggregator Class
1
2
The RecordAggregator class provides fine-grained control over record aggregation with manual size management, buffering, and flushing capabilities. It's ideal for applications that need precise control over when aggregated records are generated and sent.
3
4
## Capabilities
5
6
### Constructor
7
8
Creates a new RecordAggregator instance with an optional callback for when records are ready.
9
10
```javascript { .api }
11
/**
12
* Create a new RecordAggregator instance
13
* @param onReadyCallback - Optional callback for when aggregated records are ready
14
*/
15
constructor(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void);
16
```
17
18
**Usage Example:**
19
20
```javascript
21
const { RecordAggregator } = require('aws-kinesis-agg');
22
23
// Create aggregator with callback
24
const aggregator = new RecordAggregator((err, encodedRecord) => {
25
if (err) {
26
console.error('Aggregation error:', err);
27
} else {
28
console.log('Record ready:', encodedRecord.partitionKey, encodedRecord.data.length);
29
// Send to Kinesis or other destination
30
}
31
});
32
33
// Create aggregator without callback (manual handling)
34
const manualAggregator = new RecordAggregator();
35
```
36
37
### Add User Record
38
39
Adds a user record to the aggregator's internal buffer. Automatically triggers the callback when the buffer reaches the size limit.
40
41
```javascript { .api }
42
/**
43
* Add a user record to the aggregator
44
* @param record - The user record to add
45
* @throws Error if record won't fit or is too large
46
*/
47
addUserRecord(record: InputUserRecord): void;
48
```
49
50
**Usage Example:**
51
52
```javascript
53
const aggregator = new RecordAggregator((err, encodedRecord) => {
54
// Handle the automatically generated encoded record
55
sendToKinesis(encodedRecord);
56
});
57
58
try {
59
// Add records one by one
60
aggregator.addUserRecord({
61
partitionKey: 'user-123',
62
data: Buffer.from('user data 1')
63
});
64
65
aggregator.addUserRecord({
66
partitionKey: 'user-456',
67
data: Buffer.from('user data 2'),
68
explicitHashKey: 'shard-key-456'
69
});
70
71
// When the internal buffer reaches ~1MB, the callback will be triggered automatically
72
73
} catch (err) {
74
console.error('Failed to add record:', err.message);
75
// Possible errors:
76
// - "Record.Data field is mandatory"
77
// - "record.partitionKey field is mandatory"
78
// - "record won't fit"
79
// - "Input record (...) is too large to fit inside a single Kinesis record"
80
}
81
```
82
83
### Build Encoded Record
84
85
Manually builds an encoded record from all buffered user records and clears the buffer.
86
87
```javascript { .api }
88
/**
89
* Build an encoded record from buffered user records and clear the buffer
90
* @returns The encoded record, or undefined if no records are buffered
91
*/
92
build(): EncodedRecord | undefined;
93
```
94
95
**Usage Example:**
96
97
```javascript
98
const aggregator = new RecordAggregator();
99
100
// Add records manually
101
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });
102
aggregator.addUserRecord({ partitionKey: 'key2', data: Buffer.from('data2') });
103
104
// Manually build the encoded record
105
const encodedRecord = aggregator.build();
106
if (encodedRecord) {
107
console.log('Built record:', encodedRecord.partitionKey, encodedRecord.data.length);
108
// Send to Kinesis or other destination
109
sendToKinesis(encodedRecord);
110
}
111
112
// Buffer is now empty, ready for more records
113
console.log('Records in buffer:', aggregator.length()); // 0
114
```
115
116
### Flush Buffered Records
117
118
Triggers the callback with any buffered records and clears the buffer.
119
120
```javascript { .api }
121
/**
122
* Flush buffered records through the callback and clear the buffer
123
* @param onReadyCallback - Optional callback, uses instance callback if not provided
124
*/
125
flushBufferedRecords(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;
126
```
127
128
**Usage Example:**
129
130
```javascript
131
const aggregator = new RecordAggregator();
132
133
// Add some records
134
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });
135
aggregator.addUserRecord({ partitionKey: 'key2', data: Buffer.from('data2') });
136
137
// Flush with custom callback
138
aggregator.flushBufferedRecords((err, encodedRecord) => {
139
if (err) {
140
console.error('Flush error:', err);
141
} else if (encodedRecord) {
142
console.log('Flushed record:', encodedRecord.data.length);
143
sendToKinesis(encodedRecord);
144
}
145
});
146
147
// Buffer is now empty
148
```
149
150
### Record Count
151
152
Returns the number of user records currently in the buffer.
153
154
```javascript { .api }
155
/**
156
* Get the number of user records currently buffered
157
* @returns Number of buffered records
158
*/
159
length(): number;
160
```
161
162
### Check Record Fit
163
164
Checks if a user record would fit in the current buffer without adding it.
165
166
```javascript { .api }
167
/**
168
* Check if a user record would fit in the current buffer
169
* @param record - The record to check
170
* @returns True if the record would fit, false otherwise
171
*/
172
checkIfUserRecordFits(record: InputUserRecord): boolean;
173
```
174
175
**Usage Example:**
176
177
```javascript
178
const aggregator = new RecordAggregator();
179
180
// Add some records
181
aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });
182
183
// Check if another record would fit
184
const largeRecord = { partitionKey: 'key2', data: Buffer.alloc(500000) }; // 500KB
185
186
if (aggregator.checkIfUserRecordFits(largeRecord)) {
187
aggregator.addUserRecord(largeRecord);
188
console.log('Large record added');
189
} else {
190
// Flush current buffer first
191
aggregator.flushBufferedRecords();
192
// Then add the large record
193
aggregator.addUserRecord(largeRecord);
194
console.log('Buffer flushed, large record added');
195
}
196
```
197
198
### Calculate Record Size
199
200
Calculates the size a user record would consume when encoded, without adding it to the buffer.
201
202
```javascript { .api }
203
/**
204
* Calculate the size a user record would consume when encoded
205
* @param record - The record to calculate size for
206
* @returns Size in bytes
207
* @throws Error if record is invalid
208
*/
209
calculateUserRecordSize(record: InputUserRecord): number;
210
```
211
212
**Usage Example:**
213
214
```javascript
215
const aggregator = new RecordAggregator();
216
217
const record = { partitionKey: 'test-key', data: Buffer.from('test data') };
218
219
try {
220
const size = aggregator.calculateUserRecordSize(record);
221
console.log(`Record would consume ${size} bytes when encoded`);
222
223
// Use size information for batching decisions
224
const remainingCapacity = 1048576 - getCurrentBufferSize(); // 1MB limit
225
if (size <= remainingCapacity) {
226
aggregator.addUserRecord(record);
227
} else {
228
console.log('Record too large for current buffer, flushing first');
229
aggregator.flushBufferedRecords();
230
aggregator.addUserRecord(record);
231
}
232
} catch (err) {
233
console.error('Invalid record:', err.message);
234
}
235
```
236
237
### Clear Records
238
239
Resets the aggregator to empty state, discarding all buffered records.
240
241
```javascript { .api }
242
/**
243
* Reset aggregator to empty state, discarding all buffered records
244
*/
245
clearRecords(): void;
246
```
247
248
### Set Callback
249
250
Updates the callback function used by automatic flushing and manual flush operations.
251
252
```javascript { .api }
253
/**
254
* Set or update the onReadyCallback function
255
* @param onReadyCallback - The new callback function
256
* @returns The current callback function
257
*/
258
setOnReadyCallback(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): Function;
259
```
260
261
### Aggregate Records (Batch)
262
263
Adds multiple records at once with optional automatic flushing.
264
265
```javascript { .api }
266
/**
267
* Add multiple records to the aggregator
268
* @param records - Array of user records to add
269
* @param forceFlush - If true, flush buffered records at the end
270
* @param onReadyCallback - Optional callback for this operation
271
*/
272
aggregateRecords(
273
records: InputUserRecord[],
274
forceFlush: boolean,
275
onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void
276
): void;
277
```
278
279
**Usage Example:**
280
281
```javascript
282
const aggregator = new RecordAggregator();
283
284
const batchRecords = [
285
{ partitionKey: 'key1', data: Buffer.from('data1') },
286
{ partitionKey: 'key2', data: Buffer.from('data2') },
287
{ partitionKey: 'key3', data: Buffer.from('data3') }
288
];
289
290
// Add all records and force flush
291
aggregator.aggregateRecords(batchRecords, true, (err, encodedRecord) => {
292
if (err) {
293
console.error('Batch aggregation error:', err);
294
} else if (encodedRecord) {
295
console.log('Batch encoded:', encodedRecord.data.length);
296
sendToKinesis(encodedRecord);
297
}
298
});
299
```
300
301
## Advanced Usage Patterns
302
303
### Stream Processing
304
305
```javascript
306
const { RecordAggregator } = require('aws-kinesis-agg');
307
const stream = require('stream');
308
309
class KinesisAggregatorStream extends stream.Transform {
310
constructor(options = {}) {
311
super({ objectMode: true });
312
313
this.aggregator = new RecordAggregator((err, encodedRecord) => {
314
if (err) {
315
this.emit('error', err);
316
} else {
317
this.push(encodedRecord);
318
}
319
});
320
}
321
322
_transform(record, encoding, callback) {
323
try {
324
if (this.aggregator.checkIfUserRecordFits(record)) {
325
this.aggregator.addUserRecord(record);
326
} else {
327
// Flush current buffer, then add new record
328
this.aggregator.flushBufferedRecords();
329
this.aggregator.addUserRecord(record);
330
}
331
callback();
332
} catch (err) {
333
callback(err);
334
}
335
}
336
337
_flush(callback) {
338
// Flush any remaining records
339
this.aggregator.flushBufferedRecords();
340
callback();
341
}
342
}
343
```
344
345
### Size-Based Batching
346
347
```javascript
348
const aggregator = new RecordAggregator();
349
const TARGET_SIZE = 512 * 1024; // 512KB target size
350
351
function addRecordWithSizeControl(record) {
352
const recordSize = aggregator.calculateUserRecordSize(record);
353
const currentRecords = aggregator.length();
354
355
// Estimate current buffer size (approximation)
356
const estimatedCurrentSize = currentRecords * 1000; // Rough estimate
357
358
if (estimatedCurrentSize + recordSize > TARGET_SIZE) {
359
// Flush before adding if we'd exceed target size
360
aggregator.flushBufferedRecords();
361
}
362
363
aggregator.addUserRecord(record);
364
}
365
```
366
367
## Types
368
369
```typescript { .api }
370
interface InputUserRecord {
371
/** The partition key for this user record */
372
partitionKey: string;
373
/** The data payload for this user record */
374
data: Buffer | string;
375
/** Optional explicit hash key for shard allocation */
376
explicitHashKey?: string;
377
}
378
379
interface EncodedRecord {
380
/** The partition key for the aggregated record */
381
partitionKey: string;
382
/** The encoded aggregated record data */
383
data: Buffer;
384
/** Optional explicit hash key for shard allocation */
385
ExplicitHashKey?: string;
386
}
387
```