or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cardinality.mdfrequency.mdhash.mdindex.mdmembership.mdquantile.mdstream-summary.md

stream-summary.mddocs/

0

# Stream Summarization and Top-K

1

2

Algorithms for tracking the most frequent items and maintaining stream summaries with error bounds. These data structures efficiently identify heavy hitters and maintain top-K lists in streaming data scenarios.

3

4

## Capabilities

5

6

### ITopK Interface

7

8

Common interface for top-K tracking algorithms.

9

10

```java { .api }

11

/**

12

* Interface for top-K element tracking

13

*/

14

public interface ITopK<T> {

15

/**

16

* Add single element to the tracker

17

* @param element element to add

18

* @return true if the element was added or updated

19

*/

20

boolean offer(T element);

21

22

/**

23

* Add element with specified count increment

24

* @param element element to add

25

* @param incrementCount count to add for this element

26

* @return true if the element was added or updated

27

*/

28

boolean offer(T element, int incrementCount);

29

30

/**

31

* Get top k elements

32

* @param k number of top elements to return

33

* @return list of top k elements in descending order by frequency

34

*/

35

List<T> peek(int k);

36

}

37

```

38

39

### StreamSummary

40

41

Space-Saving algorithm implementation for maintaining stream summaries and tracking top-K most frequent elements with error bounds.

42

43

```java { .api }

44

/**

45

* Space-Saving algorithm for stream summarization and top-K tracking

46

*/

47

public class StreamSummary<T> implements ITopK<T>, Externalizable {

48

/**

49

* Create StreamSummary with specified capacity

50

* @param capacity maximum number of items to track

51

*/

52

public StreamSummary(int capacity);

53

54

/**

55

* Create empty StreamSummary for deserialization

56

*/

57

public StreamSummary();

58

59

/**

60

* Create StreamSummary from serialized bytes

61

* @param bytes serialized StreamSummary data

62

* @throws IOException if deserialization fails

63

*/

64

public StreamSummary(byte[] bytes) throws IOException;

65

66

/**

67

* Get capacity of the summary

68

* @return maximum number of items tracked

69

*/

70

public int getCapacity();

71

72

/**

73

* Add item to summary

74

* @param item item to add

75

* @return true if item was added or updated

76

*/

77

public boolean offer(T item);

78

79

/**

80

* Add item with count to summary

81

* @param item item to add

82

* @param incrementCount count to add

83

* @return true if item was added or updated

84

*/

85

public boolean offer(T item, int incrementCount);

86

87

/**

88

* Add item and return dropped item if capacity exceeded

89

* @param item item to add

90

* @param incrementCount count to add

91

* @return item that was dropped, or null if none

92

*/

93

public T offerReturnDropped(T item, int incrementCount);

94

95

/**

96

* Add item and return both addition status and dropped item

97

* @param item item to add

98

* @param incrementCount count to add

99

* @return Pair containing (wasAdded, droppedItem)

100

*/

101

public Pair<Boolean, T> offerReturnAll(T item, int incrementCount);

102

103

/**

104

* Get top k items

105

* @param k number of items to return

106

* @return list of top k items

107

*/

108

public List<T> peek(int k);

109

110

/**

111

* Get top k items with their counters (including error bounds)

112

* @param k number of items to return

113

* @return list of Counter objects with items, counts, and error bounds

114

*/

115

public List<Counter<T>> topK(int k);

116

117

/**

118

* Get current size (number of tracked items)

119

* @return current number of tracked items

120

*/

121

public int size();

122

123

/**

124

* Deserialize from byte array

125

* @param bytes serialized data

126

* @throws IOException if deserialization fails

127

*/

128

public void fromBytes(byte[] bytes) throws IOException;

129

130

/**

131

* Serialize to byte array

132

* @return serialized data

133

* @throws IOException if serialization fails

134

*/

135

public byte[] toBytes() throws IOException;

136

}

137

```

138

139

**Usage Examples:**

140

141

```java

142

import com.clearspring.analytics.stream.StreamSummary;

143

import com.clearspring.analytics.stream.Counter;

144

145

// Create summary to track top 100 items

146

StreamSummary<String> summary = new StreamSummary<>(100);

147

148

// Process stream data

149

summary.offer("apple");

150

summary.offer("banana");

151

summary.offer("apple"); // apple count is now 2

152

summary.offer("cherry", 5); // add cherry with count 5

153

154

// Get top items

155

List<String> top10 = summary.peek(10);

156

List<Counter<String>> top10WithCounts = summary.topK(10);

157

158

// Print results with error bounds

159

for (Counter<String> counter : top10WithCounts) {

160

System.out.println(counter.getItem() + ": " +

161

counter.getCount() + " (±" + counter.getError() + ")");

162

}

163

```

164

165

### ConcurrentStreamSummary

166

167

Thread-safe version of StreamSummary for concurrent access scenarios.

168

169

```java { .api }

170

/**

171

* Thread-safe version of StreamSummary

172

*/

173

public class ConcurrentStreamSummary<T> implements ITopK<T> {

174

/**

175

* Create concurrent stream summary with specified capacity

176

* @param capacity maximum number of items to track

177

*/

178

public ConcurrentStreamSummary(final int capacity);

179

180

/**

181

* Add element (thread-safe)

182

* @param element element to add

183

* @return true if element was added or updated

184

*/

185

public boolean offer(final T element);

186

187

/**

188

* Add element with count (thread-safe)

189

* @param element element to add

190

* @param incrementCount count to add

191

* @return true if element was added or updated

192

*/

193

public boolean offer(final T element, final int incrementCount);

194

195

/**

196

* Get top k elements (thread-safe)

197

* @param k number of elements to return

198

* @return list of top k elements

199

*/

200

public List<T> peek(final int k);

201

202

/**

203

* Get top k elements with scores (thread-safe)

204

* @param k number of elements to return

205

* @return list of ScoredItem objects with counts and error bounds

206

*/

207

public List<ScoredItem<T>> peekWithScores(final int k);

208

}

209

```

210

211

### Counter

212

213

Represents a counter for tracking item frequency with error bounds.

214

215

```java { .api }

216

/**

217

* Counter with item, count, and error bound information

218

*/

219

public class Counter<T> implements Externalizable {

220

/**

221

* Create empty counter for deserialization

222

*/

223

public Counter();

224

225

/**

226

* Get the tracked item

227

* @return the item being counted

228

*/

229

public T getItem();

230

231

/**

232

* Get the count value

233

* @return current count for the item

234

*/

235

public long getCount();

236

237

/**

238

* Get the error bound

239

* @return maximum possible error in the count

240

*/

241

public long getError();

242

243

/**

244

* String representation

245

*/

246

public String toString();

247

}

248

```

249

250

### ScoredItem

251

252

Thread-safe item with count and error tracking for concurrent operations.

253

254

```java { .api }

255

/**

256

* Thread-safe item with atomic count operations

257

*/

258

public class ScoredItem<T> implements Comparable<ScoredItem<T>> {

259

/**

260

* Create scored item with count and error

261

* @param item the item

262

* @param count initial count

263

* @param error initial error bound

264

*/

265

public ScoredItem(final T item, final long count, final long error);

266

267

/**

268

* Create scored item with count (error defaults to 0)

269

* @param item the item

270

* @param count initial count

271

*/

272

public ScoredItem(final T item, final long count);

273

274

/**

275

* Atomically add to count and return new value

276

* @param delta amount to add

277

* @return new count value

278

*/

279

public long addAndGetCount(final long delta);

280

281

/**

282

* Set error bound

283

* @param newError new error value

284

*/

285

public void setError(final long newError);

286

287

/**

288

* Get error bound

289

* @return current error bound

290

*/

291

public long getError();

292

293

/**

294

* Get the item

295

* @return the tracked item

296

*/

297

public T getItem();

298

299

/**

300

* Check if this is a new item

301

* @return true if item is new

302

*/

303

public boolean isNewItem();

304

305

/**

306

* Get current count

307

* @return current count value

308

*/

309

public long getCount();

310

311

/**

312

* Set new item flag

313

* @param newItem whether item is new

314

*/

315

public void setNewItem(final boolean newItem);

316

317

/**

318

* Compare by count (for sorting)

319

* @param o other ScoredItem

320

* @return comparison result

321

*/

322

public int compareTo(final ScoredItem<T> o);

323

}

324

```

325

326

### StochasticTopper

327

328

Stochastic algorithm for finding most frequent items using reservoir sampling techniques.

329

330

```java { .api }

331

/**

332

* Stochastic top-K algorithm using reservoir sampling

333

*/

334

public class StochasticTopper<T> implements ITopK<T> {

335

/**

336

* Create stochastic topper with sample size

337

* @param sampleSize size of the internal sample

338

*/

339

public StochasticTopper(int sampleSize);

340

341

/**

342

* Create stochastic topper with sample size and seed

343

* @param sampleSize size of the internal sample

344

* @param seed random seed for reproducible results

345

*/

346

public StochasticTopper(int sampleSize, Long seed);

347

348

/**

349

* Add item with count

350

* @param item item to add

351

* @param incrementCount count to add

352

* @return true if item was processed

353

*/

354

public boolean offer(T item, int incrementCount);

355

356

/**

357

* Add single item

358

* @param item item to add

359

* @return true if item was processed

360

*/

361

public boolean offer(T item);

362

363

/**

364

* Get top k items

365

* @param k number of items to return

366

* @return list of top k items

367

*/

368

public List<T> peek(int k);

369

}

370

```

371

372

### ISampleSet Interface

373

374

Interface for sample set operations used in some stream summarization algorithms.

375

376

```java { .api }

377

/**

378

* Interface for sample set operations

379

*/

380

public interface ISampleSet<T> {

381

/**

382

* Add element to sample set

383

* @param element element to add

384

* @return count after addition

385

*/

386

long put(T element);

387

388

/**

389

* Add element with count to sample set

390

* @param element element to add

391

* @param incrementCount count to add

392

* @return count after addition

393

*/

394

long put(T element, int incrementCount);

395

396

/**

397

* Remove random element from sample set

398

* @return removed element, or null if empty

399

*/

400

T removeRandom();

401

402

/**

403

* Get top element without removing

404

* @return top element, or null if empty

405

*/

406

T peek();

407

408

/**

409

* Get top k elements without removing

410

* @param k number of elements to return

411

* @return list of top k elements

412

*/

413

List<T> peek(int k);

414

415

/**

416

* Get current size

417

* @return number of unique elements

418

*/

419

int size();

420

421

/**

422

* Get total count

423

* @return sum of all element counts

424

*/

425

long count();

426

}

427

```

428

429

### SampleSet

430

431

Implementation of ISampleSet with frequency-based ordering.

432

433

```java { .api }

434

/**

435

* Sample set implementation with frequency-based ordering

436

*/

437

public class SampleSet<T> implements ISampleSet<T> {

438

/**

439

* Create sample set with default capacity (7)

440

*/

441

public SampleSet();

442

443

/**

444

* Create sample set with specified capacity

445

* @param capacity maximum number of elements to track

446

*/

447

public SampleSet(int capacity);

448

449

/**

450

* Create sample set with capacity and custom random generator

451

* @param capacity maximum number of elements to track

452

* @param random random number generator to use

453

*/

454

public SampleSet(int capacity, Random random);

455

}

456

```

457

458

## Usage Patterns

459

460

### Basic Top-K Tracking

461

462

```java

463

// Track top 20 most frequent items

464

StreamSummary<String> topItems = new StreamSummary<>(20);

465

466

// Process stream data

467

for (String item : dataStream) {

468

topItems.offer(item);

469

}

470

471

// Get top 10 with counts and error bounds

472

List<Counter<String>> top10 = topItems.topK(10);

473

474

for (Counter<String> counter : top10) {

475

System.out.printf("%s: %d (±%d)%n",

476

counter.getItem(),

477

counter.getCount(),

478

counter.getError());

479

}

480

```

481

482

### Heavy Hitters Detection

483

484

```java

485

StreamSummary<String> heavyHitters = new StreamSummary<>(100);

486

long totalCount = 0;

487

double threshold = 0.01; // 1% threshold

488

489

for (String item : dataStream) {

490

heavyHitters.offer(item);

491

totalCount++;

492

493

// Periodically check for heavy hitters

494

if (totalCount % 10000 == 0) {

495

List<Counter<String>> candidates = heavyHitters.topK(10);

496

497

for (Counter<String> counter : candidates) {

498

double frequency = (double) counter.getCount() / totalCount;

499

if (frequency >= threshold) {

500

System.out.println("Heavy hitter: " + counter.getItem() +

501

" (" + String.format("%.2f%%", frequency * 100) + ")");

502

}

503

}

504

}

505

}

506

```

507

508

### Concurrent Top-K Tracking

509

510

```java

511

// Thread-safe version for concurrent access

512

ConcurrentStreamSummary<String> concurrentSummary =

513

new ConcurrentStreamSummary<>(50);

514

515

// Multiple threads can safely add items

516

ExecutorService executor = Executors.newFixedThreadPool(4);

517

518

for (int i = 0; i < 4; i++) {

519

executor.submit(() -> {

520

for (String item : threadLocalData) {

521

concurrentSummary.offer(item);

522

}

523

});

524

}

525

526

executor.shutdown();

527

executor.awaitTermination(1, TimeUnit.MINUTES);

528

529

// Get results

530

List<ScoredItem<String>> topItems = concurrentSummary.peekWithScores(10);

531

```

532

533

### Stochastic Sampling for Large Streams

534

535

```java

536

// Use stochastic approach for very large streams

537

StochasticTopper<String> sampler = new StochasticTopper<>(1000);

538

539

// Process massive stream efficiently

540

for (String item : massiveDataStream) {

541

sampler.offer(item);

542

}

543

544

// Get approximate top items

545

List<String> approximateTop10 = sampler.peek(10);

546

547

System.out.println("Approximate top items: " + approximateTop10);

548

```

549

550

### Stream Summary Persistence

551

552

```java

553

StreamSummary<String> summary = new StreamSummary<>(100);

554

555

// Process batch 1

556

for (String item : batch1) {

557

summary.offer(item);

558

}

559

560

// Serialize state

561

byte[] serialized = summary.toBytes();

562

saveToDatabase("stream_summary_state", serialized);

563

564

// Later, restore and continue

565

byte[] restored = loadFromDatabase("stream_summary_state");

566

StreamSummary<String> restoredSummary = new StreamSummary<>(restored);

567

568

// Continue processing

569

for (String item : batch2) {

570

restoredSummary.offer(item);

571

}

572

```

573

574

### Capacity Management and Monitoring

575

576

```java

577

StreamSummary<String> summary = new StreamSummary<>(50);

578

579

for (String item : dataStream) {

580

T dropped = summary.offerReturnDropped(item, 1);

581

582

if (dropped != null) {

583

// Item was dropped due to capacity limit

584

System.out.println("Dropped item: " + dropped);

585

586

// Could log metrics, adjust capacity, etc.

587

updateDroppedItemMetrics(dropped);

588

}

589

}

590

591

// Monitor current capacity usage

592

System.out.println("Items tracked: " + summary.size() + "/" + summary.getCapacity());

593

```

594

595

## Algorithm Selection Guidelines

596

597

### StreamSummary vs StochasticTopper

598

599

**Use StreamSummary when**:

600

- Exact error bounds are needed

601

- Memory usage is predictable and bounded

602

- Deterministic results are required

603

- Need to track items with their frequencies

604

605

**Use StochasticTopper when**:

606

- Approximate results are acceptable

607

- Very large streams need processing

608

- Memory must be strictly limited

609

- Randomization is acceptable

610

611

### Concurrent vs Single-threaded

612

613

**Use ConcurrentStreamSummary when**:

614

- Multiple threads need to add items simultaneously

615

- Thread safety is required

616

- Performance under contention is acceptable

617

618

**Use regular StreamSummary when**:

619

- Single-threaded access

620

- Maximum performance is needed

621

- External synchronization is available

622

623

## Performance Characteristics

624

625

**StreamSummary**:

626

- Space: O(capacity)

627

- Insert: O(log capacity)

628

- Query: O(k) for top-k

629

- Guarantees: Exact error bounds

630

631

**ConcurrentStreamSummary**:

632

- Space: O(capacity)

633

- Insert: O(log capacity) with synchronization overhead

634

- Query: O(k) for top-k with synchronization

635

- Thread-safe operations

636

637

**StochasticTopper**:

638

- Space: O(sample size)

639

- Insert: O(1) amortized

640

- Query: O(k log k) for top-k

641

- Probabilistic accuracy

642

643

## Error Bound Guarantees

644

645

StreamSummary provides the guarantee that for any item with true frequency `f`, the estimated frequency `f'` satisfies:

646

647

`max(0, f - ε) ≤ f' ≤ f`

648

649

Where `ε = total_stream_size / capacity` is the maximum possible error.

650

651

This means the algorithm never overestimates frequencies, and the underestimation is bounded by the total number of items seen divided by the capacity.