0
# Record Deaggregation
1
2
Record deaggregation extracts individual user records from aggregated Kinesis records created by the Kinesis Producer Library (KPL) or other aggregation tools. The module provides both synchronous and asynchronous deaggregation methods with optional checksum validation.
3
4
## Capabilities
5
6
### Synchronous Deaggregation
7
8
Extracts all user records from an aggregated Kinesis record and returns them as an array.
9
10
```javascript { .api }
11
/**
12
* Synchronously deaggregate a Kinesis record into user records
13
* @param kinesisRecord - The Kinesis record to deaggregate
14
* @param computeChecksums - Whether to validate record checksums
15
* @param afterRecordCallback - Callback with error and array of user records
16
*/
17
function deaggregateSync(
18
kinesisRecord: KinesisStreamRecordPayload,
19
computeChecksums: boolean,
20
afterRecordCallback: (err: Error, userRecords?: UserRecord[]) => void
21
): void;
22
```
23
24
**Usage Example:**
25
26
```javascript
27
const agg = require('aws-kinesis-agg');
28
29
// Process a Kinesis record from Lambda event
30
exports.handler = (event, context) => {
31
event.Records.forEach(kinesisRecord => {
32
agg.deaggregateSync(kinesisRecord.kinesis, true, (err, userRecords) => {
33
if (err) {
34
console.error('Deaggregation failed:', err);
35
return;
36
}
37
38
console.log(`Found ${userRecords.length} user records`);
39
userRecords.forEach((record, index) => {
40
console.log(`Record ${index}:`, {
41
partitionKey: record.partitionKey,
42
sequenceNumber: record.sequenceNumber,
43
subSequenceNumber: record.subSequenceNumber,
44
dataLength: record.data.length
45
});
46
47
// Process individual user record
48
const userData = Buffer.from(record.data, 'base64');
49
processUserData(userData);
50
});
51
});
52
});
53
};
54
```
55
56
### Asynchronous Deaggregation
57
58
Processes user records one at a time through a callback, allowing for streaming processing of large aggregated records.
59
60
```javascript { .api }
61
/**
62
* Asynchronously deaggregate a Kinesis record, calling perRecordCallback for each user record
63
* @param kinesisRecord - The Kinesis record to deaggregate
64
* @param computeChecksums - Whether to validate record checksums
65
* @param perRecordCallback - Called for each extracted user record
66
* @param afterRecordCallback - Called when all records processed or on error
67
*/
68
function deaggregate(
69
kinesisRecord: KinesisStreamRecordPayload,
70
computeChecksums: boolean,
71
perRecordCallback: (err: Error, userRecord?: UserRecord) => void,
72
afterRecordCallback: (err?: Error, errorUserRecord?: UserRecord) => void
73
): void;
74
```
75
76
**Usage Example:**
77
78
```javascript
79
const agg = require('aws-kinesis-agg');
80
81
// Process records asynchronously
82
agg.deaggregate(
83
kinesisRecord.kinesis,
84
true,
85
// Per-record callback
86
(err, userRecord) => {
87
if (err) {
88
console.error('Error processing user record:', err);
89
return;
90
}
91
92
// Process individual user record immediately
93
console.log('Processing user record:', userRecord.partitionKey);
94
const userData = Buffer.from(userRecord.data, 'base64');
95
96
// Async processing (e.g., send to another service)
97
processUserRecordAsync(userData);
98
},
99
// After-record callback
100
(err, errorRecord) => {
101
if (err) {
102
console.error('Deaggregation completed with errors:', err);
103
if (errorRecord) {
104
console.error('Error record details:', errorRecord);
105
}
106
} else {
107
console.log('Deaggregation completed successfully');
108
}
109
}
110
);
111
```
112
113
### Checksum Validation
114
115
The `computeChecksums` parameter controls whether the module validates the MD5 checksum embedded in aggregated records.
116
117
```javascript
118
// Enable checksum validation (recommended)
119
agg.deaggregateSync(record, true, callback);
120
121
// Disable checksum validation (faster but less safe)
122
agg.deaggregateSync(record, false, callback);
123
```
124
125
**Important Notes:**
126
- Enabling checksum validation ensures data integrity but adds computational overhead
127
- If checksum validation fails, an error is returned via the callback
128
- Non-aggregated records are processed normally regardless of checksum setting
129
130
### Error Handling
131
132
The deaggregation functions handle several error scenarios:
133
134
- **Invalid Checksum**: When `computeChecksums` is true and checksum validation fails
135
- **Malformed Protocol Buffer**: When the aggregated record format is corrupted
136
- **Processing Errors**: When individual user record processing fails (async mode only)
137
138
For asynchronous deaggregation, errors during `perRecordCallback` execution will trigger the `afterRecordCallback` with error details and the problematic record information.
139
140
### Format Compatibility
141
142
The module automatically handles both AWS Lambda event formats:
143
144
- **v2 Format**: Traditional lowercase property names (`data`, `partitionKey`)
145
- **v3 Format**: PascalCase property names (`Data`, `PartitionKey`)
146
147
Records are automatically converted between formats as needed, ensuring compatibility across different Lambda runtime versions.
148
149
## Types
150
151
```typescript { .api }
152
interface KinesisStreamRecordPayload {
153
/** The partition key of the record */
154
partitionKey: string;
155
/** Optional explicit partition key for shard allocation */
156
explicitPartitionKey?: string;
157
/** The sequence number assigned by Kinesis */
158
sequenceNumber: string;
159
/** Base64-encoded record data */
160
data: string;
161
}
162
163
interface UserRecord {
164
/** The partition key provided when the record was submitted */
165
partitionKey: string;
166
/** The explicit hash key for shard allocation (if provided) */
167
explicitPartitionKey?: string;
168
/** The sequence number of the containing aggregated record */
169
sequenceNumber: string;
170
/** The position of this user record within the aggregated record */
171
subSequenceNumber: number;
172
/** The original data transmitted by the producer (base64 encoded) */
173
data: string;
174
}
175
```