or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation.mddeaggregation.mdindex.mdrecord-aggregator.md

record-aggregator.mddocs/

0

# Record Aggregator Class

1

2

The RecordAggregator class provides fine-grained control over record aggregation with manual size management, buffering, and flushing capabilities. It's ideal for applications that need precise control over when aggregated records are generated and sent.

3

4

## Capabilities

5

6

### Constructor

7

8

Creates a new RecordAggregator instance with an optional callback for when records are ready.

9

10

```javascript { .api }

11

/**

12

* Create a new RecordAggregator instance

13

* @param onReadyCallback - Optional callback for when aggregated records are ready

14

*/

15

constructor(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void);

16

```

17

18

**Usage Example:**

19

20

```javascript

21

const { RecordAggregator } = require('aws-kinesis-agg');

22

23

// Create aggregator with callback

24

const aggregator = new RecordAggregator((err, encodedRecord) => {

25

if (err) {

26

console.error('Aggregation error:', err);

27

} else {

28

console.log('Record ready:', encodedRecord.partitionKey, encodedRecord.data.length);

29

// Send to Kinesis or other destination

30

}

31

});

32

33

// Create aggregator without callback (manual handling)

34

const manualAggregator = new RecordAggregator();

35

```

36

37

### Add User Record

38

39

Adds a user record to the aggregator's internal buffer. Automatically triggers the callback when the buffer reaches the size limit.

40

41

```javascript { .api }

42

/**

43

* Add a user record to the aggregator

44

* @param record - The user record to add

45

* @throws Error if record won't fit or is too large

46

*/

47

addUserRecord(record: InputUserRecord): void;

48

```

49

50

**Usage Example:**

51

52

```javascript

53

const aggregator = new RecordAggregator((err, encodedRecord) => {

54

// Handle the automatically generated encoded record

55

sendToKinesis(encodedRecord);

56

});

57

58

try {

59

// Add records one by one

60

aggregator.addUserRecord({

61

partitionKey: 'user-123',

62

data: Buffer.from('user data 1')

63

});

64

65

aggregator.addUserRecord({

66

partitionKey: 'user-456',

67

data: Buffer.from('user data 2'),

68

explicitHashKey: 'shard-key-456'

69

});

70

71

// When the internal buffer reaches ~1MB, the callback will be triggered automatically

72

73

} catch (err) {

74

console.error('Failed to add record:', err.message);

75

// Possible errors:

76

// - "Record.Data field is mandatory"

77

// - "record.partitionKey field is mandatory"

78

// - "record won't fit"

79

// - "Input record (...) is too large to fit inside a single Kinesis record"

80

}

81

```

82

83

### Build Encoded Record

84

85

Manually builds an encoded record from all buffered user records and clears the buffer.

86

87

```javascript { .api }

88

/**

89

* Build an encoded record from buffered user records and clear the buffer

90

* @returns The encoded record, or undefined if no records are buffered

91

*/

92

build(): EncodedRecord | undefined;

93

```

94

95

**Usage Example:**

96

97

```javascript

98

const aggregator = new RecordAggregator();

99

100

// Add records manually

101

aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });

102

aggregator.addUserRecord({ partitionKey: 'key2', data: Buffer.from('data2') });

103

104

// Manually build the encoded record

105

const encodedRecord = aggregator.build();

106

if (encodedRecord) {

107

console.log('Built record:', encodedRecord.partitionKey, encodedRecord.data.length);

108

// Send to Kinesis or other destination

109

sendToKinesis(encodedRecord);

110

}

111

112

// Buffer is now empty, ready for more records

113

console.log('Records in buffer:', aggregator.length()); // 0

114

```

115

116

### Flush Buffered Records

117

118

Triggers the callback with any buffered records and clears the buffer.

119

120

```javascript { .api }

121

/**

122

* Flush buffered records through the callback and clear the buffer

123

* @param onReadyCallback - Optional callback, uses instance callback if not provided

124

*/

125

flushBufferedRecords(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): void;

126

```

127

128

**Usage Example:**

129

130

```javascript

131

const aggregator = new RecordAggregator();

132

133

// Add some records

134

aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });

135

aggregator.addUserRecord({ partitionKey: 'key2', data: Buffer.from('data2') });

136

137

// Flush with custom callback

138

aggregator.flushBufferedRecords((err, encodedRecord) => {

139

if (err) {

140

console.error('Flush error:', err);

141

} else if (encodedRecord) {

142

console.log('Flushed record:', encodedRecord.data.length);

143

sendToKinesis(encodedRecord);

144

}

145

});

146

147

// Buffer is now empty

148

```

149

150

### Record Count

151

152

Returns the number of user records currently in the buffer.

153

154

```javascript { .api }

155

/**

156

* Get the number of user records currently buffered

157

* @returns Number of buffered records

158

*/

159

length(): number;

160

```

161

162

### Check Record Fit

163

164

Checks if a user record would fit in the current buffer without adding it.

165

166

```javascript { .api }

167

/**

168

* Check if a user record would fit in the current buffer

169

* @param record - The record to check

170

* @returns True if the record would fit, false otherwise

171

*/

172

checkIfUserRecordFits(record: InputUserRecord): boolean;

173

```

174

175

**Usage Example:**

176

177

```javascript

178

const aggregator = new RecordAggregator();

179

180

// Add some records

181

aggregator.addUserRecord({ partitionKey: 'key1', data: Buffer.from('data1') });

182

183

// Check if another record would fit

184

const largeRecord = { partitionKey: 'key2', data: Buffer.alloc(500000) }; // 500KB

185

186

if (aggregator.checkIfUserRecordFits(largeRecord)) {

187

aggregator.addUserRecord(largeRecord);

188

console.log('Large record added');

189

} else {

190

// Flush current buffer first

191

aggregator.flushBufferedRecords();

192

// Then add the large record

193

aggregator.addUserRecord(largeRecord);

194

console.log('Buffer flushed, large record added');

195

}

196

```

197

198

### Calculate Record Size

199

200

Calculates the size a user record would consume when encoded, without adding it to the buffer.

201

202

```javascript { .api }

203

/**

204

* Calculate the size a user record would consume when encoded

205

* @param record - The record to calculate size for

206

* @returns Size in bytes

207

* @throws Error if record is invalid

208

*/

209

calculateUserRecordSize(record: InputUserRecord): number;

210

```

211

212

**Usage Example:**

213

214

```javascript

215

const aggregator = new RecordAggregator();

216

217

const record = { partitionKey: 'test-key', data: Buffer.from('test data') };

218

219

try {

220

const size = aggregator.calculateUserRecordSize(record);

221

console.log(`Record would consume ${size} bytes when encoded`);

222

223

// Use size information for batching decisions

224

const remainingCapacity = 1048576 - getCurrentBufferSize(); // 1MB limit

225

if (size <= remainingCapacity) {

226

aggregator.addUserRecord(record);

227

} else {

228

console.log('Record too large for current buffer, flushing first');

229

aggregator.flushBufferedRecords();

230

aggregator.addUserRecord(record);

231

}

232

} catch (err) {

233

console.error('Invalid record:', err.message);

234

}

235

```

236

237

### Clear Records

238

239

Resets the aggregator to empty state, discarding all buffered records.

240

241

```javascript { .api }

242

/**

243

* Reset aggregator to empty state, discarding all buffered records

244

*/

245

clearRecords(): void;

246

```

247

248

### Set Callback

249

250

Updates the callback function used by automatic flushing and manual flush operations.

251

252

```javascript { .api }

253

/**

254

* Set or update the onReadyCallback function

255

* @param onReadyCallback - The new callback function

256

* @returns The current callback function

257

*/

258

setOnReadyCallback(onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void): Function;

259

```

260

261

### Aggregate Records (Batch)

262

263

Adds multiple records at once with optional automatic flushing.

264

265

```javascript { .api }

266

/**

267

* Add multiple records to the aggregator

268

* @param records - Array of user records to add

269

* @param forceFlush - If true, flush buffered records at the end

270

* @param onReadyCallback - Optional callback for this operation

271

*/

272

aggregateRecords(

273

records: InputUserRecord[],

274

forceFlush: boolean,

275

onReadyCallback?: (err: Error, encodedRecord?: EncodedRecord) => void

276

): void;

277

```

278

279

**Usage Example:**

280

281

```javascript

282

const aggregator = new RecordAggregator();

283

284

const batchRecords = [

285

{ partitionKey: 'key1', data: Buffer.from('data1') },

286

{ partitionKey: 'key2', data: Buffer.from('data2') },

287

{ partitionKey: 'key3', data: Buffer.from('data3') }

288

];

289

290

// Add all records and force flush

291

aggregator.aggregateRecords(batchRecords, true, (err, encodedRecord) => {

292

if (err) {

293

console.error('Batch aggregation error:', err);

294

} else if (encodedRecord) {

295

console.log('Batch encoded:', encodedRecord.data.length);

296

sendToKinesis(encodedRecord);

297

}

298

});

299

```

300

301

## Advanced Usage Patterns

302

303

### Stream Processing

304

305

```javascript

306

const { RecordAggregator } = require('aws-kinesis-agg');

307

const stream = require('stream');

308

309

class KinesisAggregatorStream extends stream.Transform {

310

constructor(options = {}) {

311

super({ objectMode: true });

312

313

this.aggregator = new RecordAggregator((err, encodedRecord) => {

314

if (err) {

315

this.emit('error', err);

316

} else {

317

this.push(encodedRecord);

318

}

319

});

320

}

321

322

_transform(record, encoding, callback) {

323

try {

324

if (this.aggregator.checkIfUserRecordFits(record)) {

325

this.aggregator.addUserRecord(record);

326

} else {

327

// Flush current buffer, then add new record

328

this.aggregator.flushBufferedRecords();

329

this.aggregator.addUserRecord(record);

330

}

331

callback();

332

} catch (err) {

333

callback(err);

334

}

335

}

336

337

_flush(callback) {

338

// Flush any remaining records

339

this.aggregator.flushBufferedRecords();

340

callback();

341

}

342

}

343

```

344

345

### Size-Based Batching

346

347

```javascript

348

const aggregator = new RecordAggregator();

349

const TARGET_SIZE = 512 * 1024; // 512KB target size

350

351

function addRecordWithSizeControl(record) {

352

const recordSize = aggregator.calculateUserRecordSize(record);

353

const currentRecords = aggregator.length();

354

355

// Estimate current buffer size (approximation)

356

const estimatedCurrentSize = currentRecords * 1000; // Rough estimate

357

358

if (estimatedCurrentSize + recordSize > TARGET_SIZE) {

359

// Flush before adding if we'd exceed target size

360

aggregator.flushBufferedRecords();

361

}

362

363

aggregator.addUserRecord(record);

364

}

365

```

366

367

## Types

368

369

```typescript { .api }

370

interface InputUserRecord {

371

/** The partition key for this user record */

372

partitionKey: string;

373

/** The data payload for this user record */

374

data: Buffer | string;

375

/** Optional explicit hash key for shard allocation */

376

explicitHashKey?: string;

377

}

378

379

interface EncodedRecord {

380

/** The partition key for the aggregated record */

381

partitionKey: string;

382

/** The encoded aggregated record data */

383

data: Buffer;

384

/** Optional explicit hash key for shard allocation */

385

ExplicitHashKey?: string;

386

}

387

```