0
# Stream Summarization and Top-K
1
2
Algorithms for tracking the most frequent items and maintaining stream summaries with error bounds. These data structures efficiently identify heavy hitters and maintain top-K lists in streaming data scenarios.
3
4
## Capabilities
5
6
### ITopK Interface
7
8
Common interface for top-K tracking algorithms.
9
10
```java { .api }
11
/**
12
* Interface for top-K element tracking
13
*/
14
public interface ITopK<T> {
15
/**
16
* Add single element to the tracker
17
* @param element element to add
18
* @return true if the element was added or updated
19
*/
20
boolean offer(T element);
21
22
/**
23
* Add element with specified count increment
24
* @param element element to add
25
* @param incrementCount count to add for this element
26
* @return true if the element was added or updated
27
*/
28
boolean offer(T element, int incrementCount);
29
30
/**
31
* Get top k elements
32
* @param k number of top elements to return
33
* @return list of top k elements in descending order by frequency
34
*/
35
List<T> peek(int k);
36
}
37
```
38
39
### StreamSummary
40
41
Space-Saving algorithm implementation for maintaining stream summaries and tracking top-K most frequent elements with error bounds.
42
43
```java { .api }
44
/**
45
* Space-Saving algorithm for stream summarization and top-K tracking
46
*/
47
public class StreamSummary<T> implements ITopK<T>, Externalizable {
48
/**
49
* Create StreamSummary with specified capacity
50
* @param capacity maximum number of items to track
51
*/
52
public StreamSummary(int capacity);
53
54
/**
55
* Create empty StreamSummary for deserialization
56
*/
57
public StreamSummary();
58
59
/**
60
* Create StreamSummary from serialized bytes
61
* @param bytes serialized StreamSummary data
62
* @throws IOException if deserialization fails
63
*/
64
public StreamSummary(byte[] bytes) throws IOException;
65
66
/**
67
* Get capacity of the summary
68
* @return maximum number of items tracked
69
*/
70
public int getCapacity();
71
72
/**
73
* Add item to summary
74
* @param item item to add
75
* @return true if item was added or updated
76
*/
77
public boolean offer(T item);
78
79
/**
80
* Add item with count to summary
81
* @param item item to add
82
* @param incrementCount count to add
83
* @return true if item was added or updated
84
*/
85
public boolean offer(T item, int incrementCount);
86
87
/**
88
* Add item and return dropped item if capacity exceeded
89
* @param item item to add
90
* @param incrementCount count to add
91
* @return item that was dropped, or null if none
92
*/
93
public T offerReturnDropped(T item, int incrementCount);
94
95
/**
96
* Add item and return both addition status and dropped item
97
* @param item item to add
98
* @param incrementCount count to add
99
* @return Pair containing (wasAdded, droppedItem)
100
*/
101
public Pair<Boolean, T> offerReturnAll(T item, int incrementCount);
102
103
/**
104
* Get top k items
105
* @param k number of items to return
106
* @return list of top k items
107
*/
108
public List<T> peek(int k);
109
110
/**
111
* Get top k items with their counters (including error bounds)
112
* @param k number of items to return
113
* @return list of Counter objects with items, counts, and error bounds
114
*/
115
public List<Counter<T>> topK(int k);
116
117
/**
118
* Get current size (number of tracked items)
119
* @return current number of tracked items
120
*/
121
public int size();
122
123
/**
124
* Deserialize from byte array
125
* @param bytes serialized data
126
* @throws IOException if deserialization fails
127
*/
128
public void fromBytes(byte[] bytes) throws IOException;
129
130
/**
131
* Serialize to byte array
132
* @return serialized data
133
* @throws IOException if serialization fails
134
*/
135
public byte[] toBytes() throws IOException;
136
}
137
```
138
139
**Usage Examples:**
140
141
```java
142
import com.clearspring.analytics.stream.StreamSummary;
143
import com.clearspring.analytics.stream.Counter;
144
145
// Create summary to track top 100 items
146
StreamSummary<String> summary = new StreamSummary<>(100);
147
148
// Process stream data
149
summary.offer("apple");
150
summary.offer("banana");
151
summary.offer("apple"); // apple count is now 2
152
summary.offer("cherry", 5); // add cherry with count 5
153
154
// Get top items
155
List<String> top10 = summary.peek(10);
156
List<Counter<String>> top10WithCounts = summary.topK(10);
157
158
// Print results with error bounds
159
for (Counter<String> counter : top10WithCounts) {
160
System.out.println(counter.getItem() + ": " +
161
counter.getCount() + " (±" + counter.getError() + ")");
162
}
163
```
164
165
### ConcurrentStreamSummary
166
167
Thread-safe version of StreamSummary for concurrent access scenarios.
168
169
```java { .api }
170
/**
171
* Thread-safe version of StreamSummary
172
*/
173
public class ConcurrentStreamSummary<T> implements ITopK<T> {
174
/**
175
* Create concurrent stream summary with specified capacity
176
* @param capacity maximum number of items to track
177
*/
178
public ConcurrentStreamSummary(final int capacity);
179
180
/**
181
* Add element (thread-safe)
182
* @param element element to add
183
* @return true if element was added or updated
184
*/
185
public boolean offer(final T element);
186
187
/**
188
* Add element with count (thread-safe)
189
* @param element element to add
190
* @param incrementCount count to add
191
* @return true if element was added or updated
192
*/
193
public boolean offer(final T element, final int incrementCount);
194
195
/**
196
* Get top k elements (thread-safe)
197
* @param k number of elements to return
198
* @return list of top k elements
199
*/
200
public List<T> peek(final int k);
201
202
/**
203
* Get top k elements with scores (thread-safe)
204
* @param k number of elements to return
205
* @return list of ScoredItem objects with counts and error bounds
206
*/
207
public List<ScoredItem<T>> peekWithScores(final int k);
208
}
209
```
210
211
### Counter
212
213
Represents a counter for tracking item frequency with error bounds.
214
215
```java { .api }
216
/**
217
* Counter with item, count, and error bound information
218
*/
219
public class Counter<T> implements Externalizable {
220
/**
221
* Create empty counter for deserialization
222
*/
223
public Counter();
224
225
/**
226
* Get the tracked item
227
* @return the item being counted
228
*/
229
public T getItem();
230
231
/**
232
* Get the count value
233
* @return current count for the item
234
*/
235
public long getCount();
236
237
/**
238
* Get the error bound
239
* @return maximum possible error in the count
240
*/
241
public long getError();
242
243
/**
244
* String representation
245
*/
246
public String toString();
247
}
248
```
249
250
### ScoredItem
251
252
Thread-safe item with count and error tracking for concurrent operations.
253
254
```java { .api }
255
/**
256
* Thread-safe item with atomic count operations
257
*/
258
public class ScoredItem<T> implements Comparable<ScoredItem<T>> {
259
/**
260
* Create scored item with count and error
261
* @param item the item
262
* @param count initial count
263
* @param error initial error bound
264
*/
265
public ScoredItem(final T item, final long count, final long error);
266
267
/**
268
* Create scored item with count (error defaults to 0)
269
* @param item the item
270
* @param count initial count
271
*/
272
public ScoredItem(final T item, final long count);
273
274
/**
275
* Atomically add to count and return new value
276
* @param delta amount to add
277
* @return new count value
278
*/
279
public long addAndGetCount(final long delta);
280
281
/**
282
* Set error bound
283
* @param newError new error value
284
*/
285
public void setError(final long newError);
286
287
/**
288
* Get error bound
289
* @return current error bound
290
*/
291
public long getError();
292
293
/**
294
* Get the item
295
* @return the tracked item
296
*/
297
public T getItem();
298
299
/**
300
* Check if this is a new item
301
* @return true if item is new
302
*/
303
public boolean isNewItem();
304
305
/**
306
* Get current count
307
* @return current count value
308
*/
309
public long getCount();
310
311
/**
312
* Set new item flag
313
* @param newItem whether item is new
314
*/
315
public void setNewItem(final boolean newItem);
316
317
/**
318
* Compare by count (for sorting)
319
* @param o other ScoredItem
320
* @return comparison result
321
*/
322
public int compareTo(final ScoredItem<T> o);
323
}
324
```
325
326
### StochasticTopper
327
328
Stochastic algorithm for finding most frequent items using reservoir sampling techniques.
329
330
```java { .api }
331
/**
332
* Stochastic top-K algorithm using reservoir sampling
333
*/
334
public class StochasticTopper<T> implements ITopK<T> {
335
/**
336
* Create stochastic topper with sample size
337
* @param sampleSize size of the internal sample
338
*/
339
public StochasticTopper(int sampleSize);
340
341
/**
342
* Create stochastic topper with sample size and seed
343
* @param sampleSize size of the internal sample
344
* @param seed random seed for reproducible results
345
*/
346
public StochasticTopper(int sampleSize, Long seed);
347
348
/**
349
* Add item with count
350
* @param item item to add
351
* @param incrementCount count to add
352
* @return true if item was processed
353
*/
354
public boolean offer(T item, int incrementCount);
355
356
/**
357
* Add single item
358
* @param item item to add
359
* @return true if item was processed
360
*/
361
public boolean offer(T item);
362
363
/**
364
* Get top k items
365
* @param k number of items to return
366
* @return list of top k items
367
*/
368
public List<T> peek(int k);
369
}
370
```
371
372
### ISampleSet Interface
373
374
Interface for sample set operations used in some stream summarization algorithms.
375
376
```java { .api }
377
/**
378
* Interface for sample set operations
379
*/
380
public interface ISampleSet<T> {
381
/**
382
* Add element to sample set
383
* @param element element to add
384
* @return count after addition
385
*/
386
long put(T element);
387
388
/**
389
* Add element with count to sample set
390
* @param element element to add
391
* @param incrementCount count to add
392
* @return count after addition
393
*/
394
long put(T element, int incrementCount);
395
396
/**
397
* Remove random element from sample set
398
* @return removed element, or null if empty
399
*/
400
T removeRandom();
401
402
/**
403
* Get top element without removing
404
* @return top element, or null if empty
405
*/
406
T peek();
407
408
/**
409
* Get top k elements without removing
410
* @param k number of elements to return
411
* @return list of top k elements
412
*/
413
List<T> peek(int k);
414
415
/**
416
* Get current size
417
* @return number of unique elements
418
*/
419
int size();
420
421
/**
422
* Get total count
423
* @return sum of all element counts
424
*/
425
long count();
426
}
427
```
428
429
### SampleSet
430
431
Implementation of ISampleSet with frequency-based ordering.
432
433
```java { .api }
434
/**
435
* Sample set implementation with frequency-based ordering
436
*/
437
public class SampleSet<T> implements ISampleSet<T> {
438
/**
439
* Create sample set with default capacity (7)
440
*/
441
public SampleSet();
442
443
/**
444
* Create sample set with specified capacity
445
* @param capacity maximum number of elements to track
446
*/
447
public SampleSet(int capacity);
448
449
/**
450
* Create sample set with capacity and custom random generator
451
* @param capacity maximum number of elements to track
452
* @param random random number generator to use
453
*/
454
public SampleSet(int capacity, Random random);
455
}
456
```
457
458
## Usage Patterns
459
460
### Basic Top-K Tracking
461
462
```java
463
// Track top 20 most frequent items
464
StreamSummary<String> topItems = new StreamSummary<>(20);
465
466
// Process stream data
467
for (String item : dataStream) {
468
topItems.offer(item);
469
}
470
471
// Get top 10 with counts and error bounds
472
List<Counter<String>> top10 = topItems.topK(10);
473
474
for (Counter<String> counter : top10) {
475
System.out.printf("%s: %d (±%d)%n",
476
counter.getItem(),
477
counter.getCount(),
478
counter.getError());
479
}
480
```
481
482
### Heavy Hitters Detection
483
484
```java
485
StreamSummary<String> heavyHitters = new StreamSummary<>(100);
486
long totalCount = 0;
487
double threshold = 0.01; // 1% threshold
488
489
for (String item : dataStream) {
490
heavyHitters.offer(item);
491
totalCount++;
492
493
// Periodically check for heavy hitters
494
if (totalCount % 10000 == 0) {
495
List<Counter<String>> candidates = heavyHitters.topK(10);
496
497
for (Counter<String> counter : candidates) {
498
double frequency = (double) counter.getCount() / totalCount;
499
if (frequency >= threshold) {
500
System.out.println("Heavy hitter: " + counter.getItem() +
501
" (" + String.format("%.2f%%", frequency * 100) + ")");
502
}
503
}
504
}
505
}
506
```
507
508
### Concurrent Top-K Tracking
509
510
```java
511
// Thread-safe version for concurrent access
512
ConcurrentStreamSummary<String> concurrentSummary =
513
new ConcurrentStreamSummary<>(50);
514
515
// Multiple threads can safely add items
516
ExecutorService executor = Executors.newFixedThreadPool(4);
517
518
for (int i = 0; i < 4; i++) {
519
executor.submit(() -> {
520
for (String item : threadLocalData) {
521
concurrentSummary.offer(item);
522
}
523
});
524
}
525
526
executor.shutdown();
527
executor.awaitTermination(1, TimeUnit.MINUTES);
528
529
// Get results
530
List<ScoredItem<String>> topItems = concurrentSummary.peekWithScores(10);
531
```
532
533
### Stochastic Sampling for Large Streams
534
535
```java
536
// Use stochastic approach for very large streams
537
StochasticTopper<String> sampler = new StochasticTopper<>(1000);
538
539
// Process massive stream efficiently
540
for (String item : massiveDataStream) {
541
sampler.offer(item);
542
}
543
544
// Get approximate top items
545
List<String> approximateTop10 = sampler.peek(10);
546
547
System.out.println("Approximate top items: " + approximateTop10);
548
```
549
550
### Stream Summary Persistence
551
552
```java
553
StreamSummary<String> summary = new StreamSummary<>(100);
554
555
// Process batch 1
556
for (String item : batch1) {
557
summary.offer(item);
558
}
559
560
// Serialize state
561
byte[] serialized = summary.toBytes();
562
saveToDatabase("stream_summary_state", serialized);
563
564
// Later, restore and continue
565
byte[] restored = loadFromDatabase("stream_summary_state");
566
StreamSummary<String> restoredSummary = new StreamSummary<>(restored);
567
568
// Continue processing
569
for (String item : batch2) {
570
restoredSummary.offer(item);
571
}
572
```
573
574
### Capacity Management and Monitoring
575
576
```java
577
StreamSummary<String> summary = new StreamSummary<>(50);
578
579
for (String item : dataStream) {
580
T dropped = summary.offerReturnDropped(item, 1);
581
582
if (dropped != null) {
583
// Item was dropped due to capacity limit
584
System.out.println("Dropped item: " + dropped);
585
586
// Could log metrics, adjust capacity, etc.
587
updateDroppedItemMetrics(dropped);
588
}
589
}
590
591
// Monitor current capacity usage
592
System.out.println("Items tracked: " + summary.size() + "/" + summary.getCapacity());
593
```
594
595
## Algorithm Selection Guidelines
596
597
### StreamSummary vs StochasticTopper
598
599
**Use StreamSummary when**:
600
- Exact error bounds are needed
601
- Memory usage is predictable and bounded
602
- Deterministic results are required
603
- Need to track items with their frequencies
604
605
**Use StochasticTopper when**:
606
- Approximate results are acceptable
607
- Very large streams need processing
608
- Memory must be strictly limited
609
- Randomization is acceptable
610
611
### Concurrent vs Single-threaded
612
613
**Use ConcurrentStreamSummary when**:
614
- Multiple threads need to add items simultaneously
615
- Thread safety is required
616
- Performance under contention is acceptable
617
618
**Use regular StreamSummary when**:
619
- Single-threaded access
620
- Maximum performance is needed
621
- External synchronization is available
622
623
## Performance Characteristics
624
625
**StreamSummary**:
626
- Space: O(capacity)
627
- Insert: O(log capacity)
628
- Query: O(k) for top-k
629
- Guarantees: Exact error bounds
630
631
**ConcurrentStreamSummary**:
632
- Space: O(capacity)
633
- Insert: O(log capacity) with synchronization overhead
634
- Query: O(k) for top-k with synchronization
635
- Thread-safe operations
636
637
**StochasticTopper**:
638
- Space: O(sample size)
639
- Insert: O(1) amortized
640
- Query: O(k log k) for top-k
641
- Probabilistic accuracy
642
643
## Error Bound Guarantees
644
645
StreamSummary provides the guarantee that for any item with true frequency `f`, the estimated frequency `f'` satisfies:
646
647
`max(0, f - ε) ≤ f' ≤ f`
648
649
Where `ε = total_stream_size / capacity` is the maximum possible error.
650
651
This means the algorithm never overestimates frequencies, and the underestimation is bounded by the total number of items seen divided by the capacity.