or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation.mddeaggregation.mdindex.mdrecord-aggregator.md

deaggregation.mddocs/

0

# Record Deaggregation

1

2

Record deaggregation extracts individual user records from aggregated Kinesis records created by the Kinesis Producer Library (KPL) or other aggregation tools. The module provides both synchronous and asynchronous deaggregation methods with optional checksum validation.

3

4

## Capabilities

5

6

### Synchronous Deaggregation

7

8

Extracts all user records from an aggregated Kinesis record and returns them as an array.

9

10

```javascript { .api }

11

/**

12

* Synchronously deaggregate a Kinesis record into user records

13

* @param kinesisRecord - The Kinesis record to deaggregate

14

* @param computeChecksums - Whether to validate record checksums

15

* @param afterRecordCallback - Callback with error and array of user records

16

*/

17

function deaggregateSync(

18

kinesisRecord: KinesisStreamRecordPayload,

19

computeChecksums: boolean,

20

afterRecordCallback: (err: Error, userRecords?: UserRecord[]) => void

21

): void;

22

```

23

24

**Usage Example:**

25

26

```javascript

27

const agg = require('aws-kinesis-agg');

28

29

// Process a Kinesis record from Lambda event

30

exports.handler = (event, context) => {

31

event.Records.forEach(kinesisRecord => {

32

agg.deaggregateSync(kinesisRecord.kinesis, true, (err, userRecords) => {

33

if (err) {

34

console.error('Deaggregation failed:', err);

35

return;

36

}

37

38

console.log(`Found ${userRecords.length} user records`);

39

userRecords.forEach((record, index) => {

40

console.log(`Record ${index}:`, {

41

partitionKey: record.partitionKey,

42

sequenceNumber: record.sequenceNumber,

43

subSequenceNumber: record.subSequenceNumber,

44

dataLength: record.data.length

45

});

46

47

// Process individual user record

48

const userData = Buffer.from(record.data, 'base64');

49

processUserData(userData);

50

});

51

});

52

});

53

};

54

```

55

56

### Asynchronous Deaggregation

57

58

Processes user records one at a time through a callback, allowing for streaming processing of large aggregated records.

59

60

```javascript { .api }

61

/**

62

* Asynchronously deaggregate a Kinesis record, calling perRecordCallback for each user record

63

* @param kinesisRecord - The Kinesis record to deaggregate

64

* @param computeChecksums - Whether to validate record checksums

65

* @param perRecordCallback - Called for each extracted user record

66

* @param afterRecordCallback - Called when all records processed or on error

67

*/

68

function deaggregate(

69

kinesisRecord: KinesisStreamRecordPayload,

70

computeChecksums: boolean,

71

perRecordCallback: (err: Error, userRecord?: UserRecord) => void,

72

afterRecordCallback: (err?: Error, errorUserRecord?: UserRecord) => void

73

): void;

74

```

75

76

**Usage Example:**

77

78

```javascript

79

const agg = require('aws-kinesis-agg');

80

81

// Process records asynchronously

82

agg.deaggregate(

83

kinesisRecord.kinesis,

84

true,

85

// Per-record callback

86

(err, userRecord) => {

87

if (err) {

88

console.error('Error processing user record:', err);

89

return;

90

}

91

92

// Process individual user record immediately

93

console.log('Processing user record:', userRecord.partitionKey);

94

const userData = Buffer.from(userRecord.data, 'base64');

95

96

// Async processing (e.g., send to another service)

97

processUserRecordAsync(userData);

98

},

99

// After-record callback

100

(err, errorRecord) => {

101

if (err) {

102

console.error('Deaggregation completed with errors:', err);

103

if (errorRecord) {

104

console.error('Error record details:', errorRecord);

105

}

106

} else {

107

console.log('Deaggregation completed successfully');

108

}

109

}

110

);

111

```

112

113

### Checksum Validation

114

115

The `computeChecksums` parameter controls whether the module validates the MD5 checksum embedded in aggregated records.

116

117

```javascript

118

// Enable checksum validation (recommended)

119

agg.deaggregateSync(record, true, callback);

120

121

// Disable checksum validation (faster but less safe)

122

agg.deaggregateSync(record, false, callback);

123

```

124

125

**Important Notes:**

126

- Enabling checksum validation ensures data integrity but adds computational overhead

127

- If checksum validation fails, an error is returned via the callback

128

- Non-aggregated records are processed normally regardless of checksum setting

129

130

### Error Handling

131

132

The deaggregation functions handle several error scenarios:

133

134

- **Invalid Checksum**: When `computeChecksums` is true and checksum validation fails

135

- **Malformed Protocol Buffer**: When the aggregated record format is corrupted

136

- **Processing Errors**: When individual user record processing fails (async mode only)

137

138

For asynchronous deaggregation, errors during `perRecordCallback` execution will trigger the `afterRecordCallback` with error details and the problematic record information.

139

140

### Format Compatibility

141

142

The module automatically handles both AWS Lambda event formats:

143

144

- **v2 Format**: Traditional lowercase property names (`data`, `partitionKey`)

145

- **v3 Format**: PascalCase property names (`Data`, `PartitionKey`)

146

147

Records are automatically converted between formats as needed, ensuring compatibility across different Lambda runtime versions.

148

149

## Types

150

151

```typescript { .api }

152

interface KinesisStreamRecordPayload {

153

/** The partition key of the record */

154

partitionKey: string;

155

/** Optional explicit partition key for shard allocation */

156

explicitPartitionKey?: string;

157

/** The sequence number assigned by Kinesis */

158

sequenceNumber: string;

159

/** Base64-encoded record data */

160

data: string;

161

}

162

163

interface UserRecord {

164

/** The partition key provided when the record was submitted */

165

partitionKey: string;

166

/** The explicit hash key for shard allocation (if provided) */

167

explicitPartitionKey?: string;

168

/** The sequence number of the containing aggregated record */

169

sequenceNumber: string;

170

/** The position of this user record within the aggregated record */

171

subSequenceNumber: number;

172

/** The original data transmitted by the producer (base64 encoded) */

173

data: string;

174

}

175

```