CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-clearspring-analytics--stream

A library for summarizing data in streams for which it is infeasible to store all events

Pending
Overview
Eval results
Files

stream-summary.mddocs/

Stream Summarization and Top-K

Algorithms for tracking the most frequent items and maintaining stream summaries with error bounds. These data structures efficiently identify heavy hitters and maintain top-K lists in streaming data scenarios.

Capabilities

ITopK Interface

Common interface for top-K tracking algorithms.

/**
 * Interface for top-K element tracking
 */
public interface ITopK<T> {
    /**
     * Add single element to the tracker
     * @param element element to add
     * @return true if the element was added or updated
     */
    boolean offer(T element);
    
    /**
     * Add element with specified count increment
     * @param element element to add
     * @param incrementCount count to add for this element
     * @return true if the element was added or updated
     */
    boolean offer(T element, int incrementCount);
    
    /**
     * Get top k elements
     * @param k number of top elements to return
     * @return list of top k elements in descending order by frequency
     */
    List<T> peek(int k);
}

StreamSummary

Space-Saving algorithm implementation for maintaining stream summaries and tracking top-K most frequent elements with error bounds.

/**
 * Space-Saving algorithm for stream summarization and top-K tracking
 */
public class StreamSummary<T> implements ITopK<T>, Externalizable {
    /**
     * Create StreamSummary with specified capacity
     * @param capacity maximum number of items to track
     */
    public StreamSummary(int capacity);
    
    /**
     * Create empty StreamSummary for deserialization
     */
    public StreamSummary();
    
    /**
     * Create StreamSummary from serialized bytes
     * @param bytes serialized StreamSummary data
     * @throws IOException if deserialization fails
     */
    public StreamSummary(byte[] bytes) throws IOException;
    
    /**
     * Get capacity of the summary
     * @return maximum number of items tracked
     */
    public int getCapacity();
    
    /**
     * Add item to summary
     * @param item item to add
     * @return true if item was added or updated
     */
    public boolean offer(T item);
    
    /**
     * Add item with count to summary
     * @param item item to add
     * @param incrementCount count to add
     * @return true if item was added or updated
     */
    public boolean offer(T item, int incrementCount);
    
    /**
     * Add item and return dropped item if capacity exceeded
     * @param item item to add
     * @param incrementCount count to add
     * @return item that was dropped, or null if none
     */
    public T offerReturnDropped(T item, int incrementCount);
    
    /**
     * Add item and return both addition status and dropped item
     * @param item item to add
     * @param incrementCount count to add
     * @return Pair containing (wasAdded, droppedItem)
     */
    public Pair<Boolean, T> offerReturnAll(T item, int incrementCount);
    
    /**
     * Get top k items
     * @param k number of items to return
     * @return list of top k items
     */
    public List<T> peek(int k);
    
    /**
     * Get top k items with their counters (including error bounds)
     * @param k number of items to return
     * @return list of Counter objects with items, counts, and error bounds
     */
    public List<Counter<T>> topK(int k);
    
    /**
     * Get current size (number of tracked items)
     * @return current number of tracked items
     */
    public int size();
    
    /**
     * Deserialize from byte array
     * @param bytes serialized data
     * @throws IOException if deserialization fails
     */
    public void fromBytes(byte[] bytes) throws IOException;
    
    /**
     * Serialize to byte array
     * @return serialized data
     * @throws IOException if serialization fails
     */
    public byte[] toBytes() throws IOException;
}

Usage Examples:

import com.clearspring.analytics.stream.StreamSummary;
import com.clearspring.analytics.stream.Counter;

// Create summary to track top 100 items
StreamSummary<String> summary = new StreamSummary<>(100);

// Process stream data
summary.offer("apple");
summary.offer("banana");  
summary.offer("apple");      // apple count is now 2
summary.offer("cherry", 5);  // add cherry with count 5

// Get top items
List<String> top10 = summary.peek(10);
List<Counter<String>> top10WithCounts = summary.topK(10);

// Print results with error bounds
for (Counter<String> counter : top10WithCounts) {
    System.out.println(counter.getItem() + ": " + 
                      counter.getCount() + " (±" + counter.getError() + ")");
}

ConcurrentStreamSummary

Thread-safe version of StreamSummary for concurrent access scenarios.

/**
 * Thread-safe version of StreamSummary
 */
public class ConcurrentStreamSummary<T> implements ITopK<T> {
    /**
     * Create concurrent stream summary with specified capacity
     * @param capacity maximum number of items to track
     */
    public ConcurrentStreamSummary(final int capacity);
    
    /**
     * Add element (thread-safe)
     * @param element element to add
     * @return true if element was added or updated
     */
    public boolean offer(final T element);
    
    /**
     * Add element with count (thread-safe)
     * @param element element to add
     * @param incrementCount count to add
     * @return true if element was added or updated
     */
    public boolean offer(final T element, final int incrementCount);
    
    /**
     * Get top k elements (thread-safe)
     * @param k number of elements to return
     * @return list of top k elements
     */
    public List<T> peek(final int k);
    
    /**
     * Get top k elements with scores (thread-safe)
     * @param k number of elements to return
     * @return list of ScoredItem objects with counts and error bounds
     */
    public List<ScoredItem<T>> peekWithScores(final int k);
}

Counter

Represents a counter for tracking item frequency with error bounds.

/**
 * Counter with item, count, and error bound information
 */
public class Counter<T> implements Externalizable {
    /**
     * Create empty counter for deserialization
     */
    public Counter();
    
    /**
     * Get the tracked item
     * @return the item being counted
     */
    public T getItem();
    
    /**
     * Get the count value
     * @return current count for the item
     */
    public long getCount();
    
    /**
     * Get the error bound
     * @return maximum possible error in the count
     */
    public long getError();
    
    /**
     * String representation
     */
    public String toString();
}

ScoredItem

Thread-safe item with count and error tracking for concurrent operations.

/**
 * Thread-safe item with atomic count operations
 */
public class ScoredItem<T> implements Comparable<ScoredItem<T>> {
    /**
     * Create scored item with count and error
     * @param item the item
     * @param count initial count
     * @param error initial error bound
     */
    public ScoredItem(final T item, final long count, final long error);
    
    /**
     * Create scored item with count (error defaults to 0)
     * @param item the item
     * @param count initial count
     */
    public ScoredItem(final T item, final long count);
    
    /**
     * Atomically add to count and return new value
     * @param delta amount to add
     * @return new count value
     */
    public long addAndGetCount(final long delta);
    
    /**
     * Set error bound
     * @param newError new error value
     */
    public void setError(final long newError);
    
    /**
     * Get error bound
     * @return current error bound
     */
    public long getError();
    
    /**
     * Get the item
     * @return the tracked item
     */
    public T getItem();
    
    /**
     * Check if this is a new item
     * @return true if item is new
     */
    public boolean isNewItem();
    
    /**
     * Get current count
     * @return current count value
     */
    public long getCount();
    
    /**
     * Set new item flag
     * @param newItem whether item is new
     */
    public void setNewItem(final boolean newItem);
    
    /**
     * Compare by count (for sorting)
     * @param o other ScoredItem
     * @return comparison result
     */
    public int compareTo(final ScoredItem<T> o);
}

StochasticTopper

Stochastic algorithm for finding most frequent items using reservoir sampling techniques.

/**
 * Stochastic top-K algorithm using reservoir sampling
 */
public class StochasticTopper<T> implements ITopK<T> {
    /**
     * Create stochastic topper with sample size
     * @param sampleSize size of the internal sample
     */
    public StochasticTopper(int sampleSize);
    
    /**
     * Create stochastic topper with sample size and seed
     * @param sampleSize size of the internal sample
     * @param seed random seed for reproducible results
     */
    public StochasticTopper(int sampleSize, Long seed);
    
    /**
     * Add item with count
     * @param item item to add
     * @param incrementCount count to add
     * @return true if item was processed
     */
    public boolean offer(T item, int incrementCount);
    
    /**
     * Add single item
     * @param item item to add
     * @return true if item was processed
     */
    public boolean offer(T item);
    
    /**
     * Get top k items
     * @param k number of items to return
     * @return list of top k items
     */
    public List<T> peek(int k);
}

ISampleSet Interface

Interface for sample set operations used in some stream summarization algorithms.

/**
 * Interface for sample set operations
 */
public interface ISampleSet<T> {
    /**
     * Add element to sample set
     * @param element element to add
     * @return count after addition
     */
    long put(T element);
    
    /**
     * Add element with count to sample set
     * @param element element to add
     * @param incrementCount count to add
     * @return count after addition
     */
    long put(T element, int incrementCount);
    
    /**
     * Remove random element from sample set
     * @return removed element, or null if empty
     */
    T removeRandom();
    
    /**
     * Get top element without removing
     * @return top element, or null if empty
     */
    T peek();
    
    /**
     * Get top k elements without removing
     * @param k number of elements to return
     * @return list of top k elements
     */
    List<T> peek(int k);
    
    /**
     * Get current size
     * @return number of unique elements
     */
    int size();
    
    /**
     * Get total count
     * @return sum of all element counts
     */
    long count();
}

SampleSet

Implementation of ISampleSet with frequency-based ordering.

/**
 * Sample set implementation with frequency-based ordering
 */
public class SampleSet<T> implements ISampleSet<T> {
    /**
     * Create sample set with default capacity (7)
     */
    public SampleSet();
    
    /**
     * Create sample set with specified capacity
     * @param capacity maximum number of elements to track
     */
    public SampleSet(int capacity);
    
    /**
     * Create sample set with capacity and custom random generator
     * @param capacity maximum number of elements to track
     * @param random random number generator to use
     */
    public SampleSet(int capacity, Random random);
}

Usage Patterns

Basic Top-K Tracking

// Track top 20 most frequent items
StreamSummary<String> topItems = new StreamSummary<>(20);

// Process stream data
for (String item : dataStream) {
    topItems.offer(item);
}

// Get top 10 with counts and error bounds
List<Counter<String>> top10 = topItems.topK(10);

for (Counter<String> counter : top10) {
    System.out.printf("%s: %d (±%d)%n", 
                     counter.getItem(), 
                     counter.getCount(), 
                     counter.getError());
}

Heavy Hitters Detection

StreamSummary<String> heavyHitters = new StreamSummary<>(100);
long totalCount = 0;
double threshold = 0.01; // 1% threshold

for (String item : dataStream) {
    heavyHitters.offer(item);
    totalCount++;
    
    // Periodically check for heavy hitters
    if (totalCount % 10000 == 0) {
        List<Counter<String>> candidates = heavyHitters.topK(10);
        
        for (Counter<String> counter : candidates) {
            double frequency = (double) counter.getCount() / totalCount;
            if (frequency >= threshold) {
                System.out.println("Heavy hitter: " + counter.getItem() + 
                                 " (" + String.format("%.2f%%", frequency * 100) + ")");
            }
        }
    }
}

Concurrent Top-K Tracking

// Thread-safe version for concurrent access
ConcurrentStreamSummary<String> concurrentSummary = 
    new ConcurrentStreamSummary<>(50);

// Multiple threads can safely add items
ExecutorService executor = Executors.newFixedThreadPool(4);

for (int i = 0; i < 4; i++) {
    executor.submit(() -> {
        for (String item : threadLocalData) {
            concurrentSummary.offer(item);
        }
    });
}

executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

// Get results
List<ScoredItem<String>> topItems = concurrentSummary.peekWithScores(10);

Stochastic Sampling for Large Streams

// Use stochastic approach for very large streams
StochasticTopper<String> sampler = new StochasticTopper<>(1000);

// Process massive stream efficiently
for (String item : massiveDataStream) {
    sampler.offer(item);
}

// Get approximate top items
List<String> approximateTop10 = sampler.peek(10);

System.out.println("Approximate top items: " + approximateTop10);

Stream Summary Persistence

StreamSummary<String> summary = new StreamSummary<>(100);

// Process batch 1
for (String item : batch1) {
    summary.offer(item);
}

// Serialize state
byte[] serialized = summary.toBytes();
saveToDatabase("stream_summary_state", serialized);

// Later, restore and continue
byte[] restored = loadFromDatabase("stream_summary_state");
StreamSummary<String> restoredSummary = new StreamSummary<>(restored);

// Continue processing
for (String item : batch2) {
    restoredSummary.offer(item);
}

Capacity Management and Monitoring

StreamSummary<String> summary = new StreamSummary<>(50);

for (String item : dataStream) {
    T dropped = summary.offerReturnDropped(item, 1);
    
    if (dropped != null) {
        // Item was dropped due to capacity limit
        System.out.println("Dropped item: " + dropped);
        
        // Could log metrics, adjust capacity, etc.
        updateDroppedItemMetrics(dropped);
    }
}

// Monitor current capacity usage
System.out.println("Items tracked: " + summary.size() + "/" + summary.getCapacity());

Algorithm Selection Guidelines

StreamSummary vs StochasticTopper

Use StreamSummary when:

  • Exact error bounds are needed
  • Memory usage is predictable and bounded
  • Deterministic results are required
  • Need to track items with their frequencies

Use StochasticTopper when:

  • Approximate results are acceptable
  • Very large streams need processing
  • Memory must be strictly limited
  • Randomization is acceptable

Concurrent vs Single-threaded

Use ConcurrentStreamSummary when:

  • Multiple threads need to add items simultaneously
  • Thread safety is required
  • Performance under contention is acceptable

Use regular StreamSummary when:

  • Single-threaded access
  • Maximum performance is needed
  • External synchronization is available

Performance Characteristics

StreamSummary:

  • Space: O(capacity)
  • Insert: O(log capacity)
  • Query: O(k) for top-k
  • Guarantees: Exact error bounds

ConcurrentStreamSummary:

  • Space: O(capacity)
  • Insert: O(log capacity) with synchronization overhead
  • Query: O(k) for top-k with synchronization
  • Thread-safe operations

StochasticTopper:

  • Space: O(sample size)
  • Insert: O(1) amortized
  • Query: O(k log k) for top-k
  • Probabilistic accuracy

Error Bound Guarantees

StreamSummary provides the guarantee that for any item with true frequency f, the estimated frequency f' satisfies:

max(0, f - ε) ≤ f' ≤ f

Where ε = total_stream_size / capacity is the maximum possible error.

This means the algorithm never overestimates frequencies, and the underestimation is bounded by the total number of items seen divided by the capacity.

Install with Tessl CLI

npx tessl i tessl/maven-com-clearspring-analytics--stream

docs

cardinality.md

frequency.md

hash.md

index.md

membership.md

quantile.md

stream-summary.md

tile.json