CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

utilities.mddocs/

Utility Classes

Shared utility classes and data generators used across multiple examples. Provides common functionality for rate limiting, data generation, and stream control.

Capabilities

ThrottledIterator

Rate-limited iterator for controlling data emission speed in streaming examples.

/**
 * Iterator that supports throttling the emission rate
 * Controls data flow rate for testing and demonstration purposes
 * @param <T> Type of elements being iterated
 */
public class ThrottledIterator<T> implements Iterator<T>, Serializable {
    
    /**
     * Creates a throttled iterator with specified emission rate
     * @param source Source iterator to wrap (must be Serializable)
     * @param elementsPerSecond Maximum elements to emit per second
     * @throws IllegalArgumentException if source is not Serializable or rate is invalid
     */
    public ThrottledIterator(Iterator<T> source, long elementsPerSecond);
    
    /**
     * Checks if more elements are available
     * @return true if source has more elements
     */
    public boolean hasNext();
    
    /**
     * Returns next element with rate limiting applied
     * Blocks if necessary to maintain specified emission rate
     * @return Next element from source iterator
     */
    public T next();
    
    /**
     * Remove operation is not supported
     * @throws UnsupportedOperationException always
     */
    public void remove();
}

Rate Limiting Implementation

Throttling Logic

The ThrottledIterator implements sophisticated rate limiting:

// Rate calculation for different throughput ranges
if (elementsPerSecond >= 100) {
    // High throughput: batch processing every 50ms
    this.sleepBatchSize = elementsPerSecond / 20;  // Elements per 50ms batch
    this.sleepBatchTime = 50;                      // 50ms batches
} else if (elementsPerSecond >= 1) {
    // Low throughput: per-element delays
    this.sleepBatchSize = 1;                       // One element at a time
    this.sleepBatchTime = 1000 / elementsPerSecond; // Delay per element
} else {
    throw new IllegalArgumentException("Elements per second must be positive and not zero");
}

Timing Control

@Override
public T next() {
    // Apply rate limiting delay if necessary
    if (lastBatchCheckTime > 0) {
        if (++num >= sleepBatchSize) {
            num = 0;
            
            final long now = System.currentTimeMillis();
            final long elapsed = now - lastBatchCheckTime;
            
            if (elapsed < sleepBatchTime) {
                try {
                    Thread.sleep(sleepBatchTime - elapsed);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            lastBatchCheckTime = now;
        }
    } else {
        lastBatchCheckTime = System.currentTimeMillis();
    }
    
    return source.next();
}

Usage Patterns

Stream Source Rate Control

// Control emission rate of sample data
Iterator<String> sampleData = Arrays.asList("data1", "data2", "data3").iterator();
ThrottledIterator<String> throttledData = new ThrottledIterator<>(sampleData, 10); // 10 elements/sec

DataStream<String> stream = env.addSource(new IteratorSourceFunction<>(throttledData));

Join Example Integration

// Throttled data sources for window join examples
DataStream<Tuple3<String, String, Long>> orangeStream = env
    .addSource(new ThrottledIterator<>(
        OrangeSourceData.ORANGE_DATA.iterator(), 
        elementsPerSecond
    ))
    .assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

DataStream<Tuple3<String, String, Long>> greenStream = env
    .addSource(new ThrottledIterator<>(
        GreenSourceData.GREEN_DATA.iterator(), 
        elementsPerSecond
    ))
    .assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

Load Testing Applications

// Generate controlled load for performance testing
List<TestEvent> testData = generateTestData(10000);
ThrottledIterator<TestEvent> loadGenerator = new ThrottledIterator<>(
    testData.iterator(), 
    1000 // 1000 events per second
);

DataStream<TestEvent> loadStream = env.addSource(
    new IteratorSourceFunction<>(loadGenerator)
);

Configuration Examples

High Throughput Configuration

// High throughput: 1000 elements/second
// Batches of 50 elements every 50ms
ThrottledIterator<T> highThroughput = new ThrottledIterator<>(source, 1000);

// Results in:
// sleepBatchSize = 50 (1000/20)
// sleepBatchTime = 50ms
// Emits 50 elements, then sleeps for remaining time in 50ms window

Low Throughput Configuration

// Low throughput: 2 elements/second
// Individual element delays of 500ms
ThrottledIterator<T> lowThroughput = new ThrottledIterator<>(source, 2);

// Results in:
// sleepBatchSize = 1
// sleepBatchTime = 500ms (1000/2)
// Emits 1 element, then sleeps for 500ms

Testing Scenarios

// Burst testing: No throttling
ThrottledIterator<T> burstMode = new ThrottledIterator<>(source, Long.MAX_VALUE);

// Trickle mode: Very slow emission
ThrottledIterator<T> trickleMode = new ThrottledIterator<>(source, 1);

// Realistic rate: Moderate throughput
ThrottledIterator<T> realisticRate = new ThrottledIterator<>(source, 100);

Error Handling

Source Validation

public ThrottledIterator(Iterator<T> source, long elementsPerSecond) {
    this.source = requireNonNull(source);
    
    // Ensure source is serializable for Flink
    if (!(source instanceof Serializable)) {
        throw new IllegalArgumentException("source must be java.io.Serializable");
    }
    
    // Validate rate parameter
    if (elementsPerSecond < 1) {
        throw new IllegalArgumentException("'elements per second' must be positive and not zero");
    }
    
    // Configure rate limiting parameters...
}

Interrupt Handling

try {
    Thread.sleep(sleepBatchTime - elapsed);
} catch (InterruptedException e) {
    // Restore interrupt flag and proceed
    Thread.currentThread().interrupt();
}

Performance Characteristics

Memory Usage

  • Minimal State: Only tracks timing information and batch counters
  • No Buffering: Wraps existing iterator without copying data
  • Serializable: Can be distributed across Flink cluster nodes

Accuracy

  • Millisecond Precision: Uses System.currentTimeMillis() for timing
  • Batch Optimization: Groups elements for better performance at high rates
  • Drift Correction: Adjusts for processing time variations

Throughput Ranges

  • High Rate (≥100/sec): Batch-based processing for efficiency
  • Low Rate (1-99/sec): Per-element timing for precision
  • Invalid Rate (<1/sec): Throws IllegalArgumentException

Integration with Flink Sources

Custom Source Function

public class ThrottledSourceFunction<T> implements SourceFunction<T> {
    private final ThrottledIterator<T> throttledIterator;
    private volatile boolean isRunning = true;
    
    public ThrottledSourceFunction(Iterator<T> source, long elementsPerSecond) {
        this.throttledIterator = new ThrottledIterator<>(source, elementsPerSecond);
    }
    
    @Override
    public void run(SourceContext<T> ctx) throws Exception {
        while (isRunning && throttledIterator.hasNext()) {
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(throttledIterator.next());
            }
        }
    }
    
    @Override
    public void cancel() {
        isRunning = false;
    }
}

Source Builder Pattern

public static <T> DataStream<T> createThrottledStream(
    StreamExecutionEnvironment env,
    Iterator<T> data,
    long elementsPerSecond) {
    
    return env.addSource(new ThrottledSourceFunction<>(data, elementsPerSecond));
}

// Usage
DataStream<String> throttledStream = createThrottledStream(
    env, 
    sampleData.iterator(), 
    50 // 50 elements per second
);

Related Utilities

While ThrottledIterator is the primary utility class in this package, it's commonly used with other data generation utilities:

Sample Data Providers

  • WindowJoinSampleData: Sample data for join examples
  • TwitterExampleData: Sample tweet data for offline testing
  • TopSpeedWindowingExampleData: Sample car telemetry data
  • SessionWindowingData: Sample clickstream data

Usage with Sample Data

// Throttled join data
ThrottledIterator<Tuple3<String, String, Long>> orangeData = 
    new ThrottledIterator<>(
        Arrays.asList(WindowJoinSampleData.ORANGE_DATA).iterator(),
        elementsPerSecond
    );

// Throttled windowing data
ThrottledIterator<Tuple4<Integer, Integer, Double, Long>> carData = 
    new ThrottledIterator<>(
        Arrays.asList(TopSpeedWindowingExampleData.CAR_DATA).iterator(),
        carsPerSecond
    );

Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Required Imports

import java.io.Serializable;
import java.util.Iterator;
import static java.util.Objects.requireNonNull;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json