or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation.mddeaggregation.mdindex.mdrecord-aggregator.md

aggregation.mddocs/

0

# Record Aggregation

1

2

Record aggregation combines multiple user records into efficiently packed Kinesis records using the Kinesis Aggregated Record Format. This reduces the number of Kinesis API calls and maximizes throughput while staying within the 1MB record size limit.

3

4

## Capabilities

5

6

### Aggregate Function

7

8

Aggregates multiple user records into encoded records ready for Kinesis transmission. Uses a queue-based approach with configurable concurrency for handling encoded records.

9

10

```javascript { .api }

11

/**

12

* Aggregate multiple user records into encoded Kinesis records

13

* @param records - Array of user records to aggregate

14

* @param encodedRecordHandler - Function to handle each encoded record

15

* @param afterPutAggregatedRecords - Called when all records are processed

16

* @param errorCallback - Called when errors occur during processing

17

* @param queueSize - Optional concurrency limit for encoded record processing (default: 1)

18

*/

19

function aggregate(

20

records: any[],

21

encodedRecordHandler: (encodedRecord: EncodedRecord, callback: (err?: Error, data?: any) => void) => void,

22

afterPutAggregatedRecords: () => void,

23

errorCallback: (error: Error, data?: EncodedRecord) => void,

24

queueSize?: number

25

): void;

26

```

27

28

**Usage Example:**

29

30

```javascript

31

const agg = require('aws-kinesis-agg');

32

const AWS = require('aws-sdk');

33

const kinesis = new AWS.Kinesis();

34

35

// Prepare user records

36

const userRecords = [

37

{ partitionKey: 'user-123', data: Buffer.from(JSON.stringify({ event: 'login', userId: '123' })) },

38

{ partitionKey: 'user-456', data: Buffer.from(JSON.stringify({ event: 'logout', userId: '456' })) },

39

{ partitionKey: 'user-789', data: Buffer.from(JSON.stringify({ event: 'purchase', userId: '789' })) }

40

];

41

42

// Aggregate and send to Kinesis

43

agg.aggregate(

44

userRecords,

45

// Encoded record handler - called for each aggregated record

46

(encodedRecord, callback) => {

47

const params = {

48

Data: encodedRecord.data,

49

PartitionKey: encodedRecord.partitionKey,

50

StreamName: 'my-kinesis-stream'

51

};

52

53

// Add explicit hash key if present

54

if (encodedRecord.ExplicitHashKey) {

55

params.ExplicitHashKey = encodedRecord.ExplicitHashKey;

56

}

57

58

// Send to Kinesis

59

kinesis.putRecord(params, (err, result) => {

60

if (err) {

61

console.error('Failed to put record:', err);

62

} else {

63

console.log('Successfully put record:', result.SequenceNumber);

64

}

65

callback(err, result);

66

});

67

},

68

// After all records processed

69

() => {

70

console.log('All aggregated records have been sent to Kinesis');

71

},

72

// Error handler

73

(err, data) => {

74

console.error('Aggregation error:', err);

75

if (data) {

76

console.error('Failed record:', data);

77

}

78

},

79

// Optional: concurrency for encoded record processing

80

3

81

);

82

```

83

84

### AWS Lambda Integration

85

86

Common pattern for AWS Lambda functions that process events and re-emit aggregated records:

87

88

```javascript

89

const agg = require('aws-kinesis-agg');

90

const AWS = require('aws-sdk');

91

const kinesis = new AWS.Kinesis();

92

93

exports.handler = async (event, context) => {

94

// Process incoming events and create user records

95

const userRecords = event.Records.map(record => {

96

// Transform the incoming record

97

const processedData = processIncomingRecord(record);

98

99

return {

100

partitionKey: processedData.userId,

101

data: Buffer.from(JSON.stringify(processedData)),

102

ExplicitHashKey: processedData.shardKey // Optional

103

};

104

});

105

106

return new Promise((resolve, reject) => {

107

agg.aggregate(

108

userRecords,

109

(encodedRecord, callback) => {

110

// Send aggregated record to output stream

111

kinesis.putRecord({

112

Data: encodedRecord.data,

113

PartitionKey: encodedRecord.partitionKey,

114

StreamName: process.env.OUTPUT_STREAM_NAME,

115

ExplicitHashKey: encodedRecord.ExplicitHashKey

116

}, callback);

117

},

118

() => {

119

console.log(`Successfully aggregated and sent ${userRecords.length} records`);

120

resolve({ statusCode: 200 });

121

},

122

(err, data) => {

123

console.error('Aggregation failed:', err);

124

reject(err);

125

}

126

);

127

});

128

};

129

```

130

131

### Record Validation

132

133

The aggregation function validates input records and provides detailed error messages:

134

135

```javascript

136

// Records must have required fields

137

const validRecord = {

138

partitionKey: 'required-key', // Required

139

data: Buffer.from('data'), // Required

140

ExplicitHashKey: 'optional' // Optional

141

};

142

143

// These will cause errors:

144

const invalidRecords = [

145

{ data: Buffer.from('data') }, // Missing partitionKey

146

{ partitionKey: 'key' }, // Missing data

147

{ partitionKey: 'key', data: null } // Invalid data

148

];

149

```

150

151

### Size Management

152

153

The aggregation function automatically manages record sizes to stay within Kinesis limits:

154

155

- **Maximum Record Size**: 1MB (1,048,576 bytes) minus overhead for magic number and checksum

156

- **Automatic Batching**: Records are automatically grouped into optimally-sized batches

157

- **Oversized Record Detection**: Records too large to fit in a single Kinesis record generate errors

158

159

```javascript

160

// The function will automatically:

161

// 1. Calculate the size of each user record when encoded

162

// 2. Group records into batches that fit within 1MB limit

163

// 3. Generate encoded records when batches are full

164

// 4. Handle the final partial batch when all records are processed

165

```

166

167

### Concurrency Control

168

169

The optional `queueSize` parameter controls how many encoded records are processed concurrently:

170

171

```javascript

172

// Process encoded records one at a time (default)

173

agg.aggregate(records, handler, afterCallback, errorCallback, 1);

174

175

// Process up to 5 encoded records concurrently

176

agg.aggregate(records, handler, afterCallback, errorCallback, 5);

177

178

// Higher concurrency can improve throughput but may increase resource usage

179

// and the chance of throttling from downstream services like Kinesis

180

```

181

182

## Types

183

184

```typescript { .api }

185

interface EncodedRecord {

186

/** The partition key for the aggregated record */

187

partitionKey: string;

188

/** The encoded aggregated record data (Protocol Buffer + magic number + checksum) */

189

data: Buffer;

190

/** Optional explicit hash key for shard allocation */

191

ExplicitHashKey?: string;

192

}

193

194

interface InputUserRecord {

195

/** The partition key for this user record */

196

partitionKey: string;

197

/** The data payload for this user record */

198

data: Buffer | string;

199

/** Optional explicit hash key for shard allocation */

200

explicitHashKey?: string;

201

}

202

```