Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Shared utility classes and data generators used across multiple examples. Provides common functionality for rate limiting, data generation, and stream control.
Rate-limited iterator for controlling data emission speed in streaming examples.
/**
* Iterator that supports throttling the emission rate
* Controls data flow rate for testing and demonstration purposes
* @param <T> Type of elements being iterated
*/
public class ThrottledIterator<T> implements Iterator<T>, Serializable {
/**
* Creates a throttled iterator with specified emission rate
* @param source Source iterator to wrap (must be Serializable)
* @param elementsPerSecond Maximum elements to emit per second
* @throws IllegalArgumentException if source is not Serializable or rate is invalid
*/
public ThrottledIterator(Iterator<T> source, long elementsPerSecond);
/**
* Checks if more elements are available
* @return true if source has more elements
*/
public boolean hasNext();
/**
* Returns next element with rate limiting applied
* Blocks if necessary to maintain specified emission rate
* @return Next element from source iterator
*/
public T next();
/**
* Remove operation is not supported
* @throws UnsupportedOperationException always
*/
public void remove();
}The ThrottledIterator implements sophisticated rate limiting:
// Rate calculation for different throughput ranges
if (elementsPerSecond >= 100) {
// High throughput: batch processing every 50ms
this.sleepBatchSize = elementsPerSecond / 20; // Elements per 50ms batch
this.sleepBatchTime = 50; // 50ms batches
} else if (elementsPerSecond >= 1) {
// Low throughput: per-element delays
this.sleepBatchSize = 1; // One element at a time
this.sleepBatchTime = 1000 / elementsPerSecond; // Delay per element
} else {
throw new IllegalArgumentException("Elements per second must be positive and not zero");
}@Override
public T next() {
// Apply rate limiting delay if necessary
if (lastBatchCheckTime > 0) {
if (++num >= sleepBatchSize) {
num = 0;
final long now = System.currentTimeMillis();
final long elapsed = now - lastBatchCheckTime;
if (elapsed < sleepBatchTime) {
try {
Thread.sleep(sleepBatchTime - elapsed);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastBatchCheckTime = now;
}
} else {
lastBatchCheckTime = System.currentTimeMillis();
}
return source.next();
}// Control emission rate of sample data
Iterator<String> sampleData = Arrays.asList("data1", "data2", "data3").iterator();
ThrottledIterator<String> throttledData = new ThrottledIterator<>(sampleData, 10); // 10 elements/sec
DataStream<String> stream = env.addSource(new IteratorSourceFunction<>(throttledData));// Throttled data sources for window join examples
DataStream<Tuple3<String, String, Long>> orangeStream = env
.addSource(new ThrottledIterator<>(
OrangeSourceData.ORANGE_DATA.iterator(),
elementsPerSecond
))
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
DataStream<Tuple3<String, String, Long>> greenStream = env
.addSource(new ThrottledIterator<>(
GreenSourceData.GREEN_DATA.iterator(),
elementsPerSecond
))
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());// Generate controlled load for performance testing
List<TestEvent> testData = generateTestData(10000);
ThrottledIterator<TestEvent> loadGenerator = new ThrottledIterator<>(
testData.iterator(),
1000 // 1000 events per second
);
DataStream<TestEvent> loadStream = env.addSource(
new IteratorSourceFunction<>(loadGenerator)
);// High throughput: 1000 elements/second
// Batches of 50 elements every 50ms
ThrottledIterator<T> highThroughput = new ThrottledIterator<>(source, 1000);
// Results in:
// sleepBatchSize = 50 (1000/20)
// sleepBatchTime = 50ms
// Emits 50 elements, then sleeps for remaining time in 50ms window// Low throughput: 2 elements/second
// Individual element delays of 500ms
ThrottledIterator<T> lowThroughput = new ThrottledIterator<>(source, 2);
// Results in:
// sleepBatchSize = 1
// sleepBatchTime = 500ms (1000/2)
// Emits 1 element, then sleeps for 500ms// Burst testing: No throttling
ThrottledIterator<T> burstMode = new ThrottledIterator<>(source, Long.MAX_VALUE);
// Trickle mode: Very slow emission
ThrottledIterator<T> trickleMode = new ThrottledIterator<>(source, 1);
// Realistic rate: Moderate throughput
ThrottledIterator<T> realisticRate = new ThrottledIterator<>(source, 100);public ThrottledIterator(Iterator<T> source, long elementsPerSecond) {
this.source = requireNonNull(source);
// Ensure source is serializable for Flink
if (!(source instanceof Serializable)) {
throw new IllegalArgumentException("source must be java.io.Serializable");
}
// Validate rate parameter
if (elementsPerSecond < 1) {
throw new IllegalArgumentException("'elements per second' must be positive and not zero");
}
// Configure rate limiting parameters...
}try {
Thread.sleep(sleepBatchTime - elapsed);
} catch (InterruptedException e) {
// Restore interrupt flag and proceed
Thread.currentThread().interrupt();
}System.currentTimeMillis() for timingpublic class ThrottledSourceFunction<T> implements SourceFunction<T> {
private final ThrottledIterator<T> throttledIterator;
private volatile boolean isRunning = true;
public ThrottledSourceFunction(Iterator<T> source, long elementsPerSecond) {
this.throttledIterator = new ThrottledIterator<>(source, elementsPerSecond);
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
while (isRunning && throttledIterator.hasNext()) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(throttledIterator.next());
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}public static <T> DataStream<T> createThrottledStream(
StreamExecutionEnvironment env,
Iterator<T> data,
long elementsPerSecond) {
return env.addSource(new ThrottledSourceFunction<>(data, elementsPerSecond));
}
// Usage
DataStream<String> throttledStream = createThrottledStream(
env,
sampleData.iterator(),
50 // 50 elements per second
);While ThrottledIterator is the primary utility class in this package, it's commonly used with other data generation utilities:
// Throttled join data
ThrottledIterator<Tuple3<String, String, Long>> orangeData =
new ThrottledIterator<>(
Arrays.asList(WindowJoinSampleData.ORANGE_DATA).iterator(),
elementsPerSecond
);
// Throttled windowing data
ThrottledIterator<Tuple4<Integer, Integer, Double, Long>> carData =
new ThrottledIterator<>(
Arrays.asList(TopSpeedWindowingExampleData.CAR_DATA).iterator(),
carsPerSecond
);<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.3</version>
</dependency>import java.io.Serializable;
import java.util.Iterator;
import static java.util.Objects.requireNonNull;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10