0
# Utility Classes
1
2
Shared utility classes and data generators used across multiple examples. Provides common functionality for rate limiting, data generation, and stream control.
3
4
## Capabilities
5
6
### ThrottledIterator
7
8
Rate-limited iterator for controlling data emission speed in streaming examples.
9
10
```java { .api }
11
/**
12
* Iterator that supports throttling the emission rate
13
* Controls data flow rate for testing and demonstration purposes
14
* @param <T> Type of elements being iterated
15
*/
16
public class ThrottledIterator<T> implements Iterator<T>, Serializable {
17
18
/**
19
* Creates a throttled iterator with specified emission rate
20
* @param source Source iterator to wrap (must be Serializable)
21
* @param elementsPerSecond Maximum elements to emit per second
22
* @throws IllegalArgumentException if source is not Serializable or rate is invalid
23
*/
24
public ThrottledIterator(Iterator<T> source, long elementsPerSecond);
25
26
/**
27
* Checks if more elements are available
28
* @return true if source has more elements
29
*/
30
public boolean hasNext();
31
32
/**
33
* Returns next element with rate limiting applied
34
* Blocks if necessary to maintain specified emission rate
35
* @return Next element from source iterator
36
*/
37
public T next();
38
39
/**
40
* Remove operation is not supported
41
* @throws UnsupportedOperationException always
42
*/
43
public void remove();
44
}
45
```
46
47
## Rate Limiting Implementation
48
49
### Throttling Logic
50
51
The ThrottledIterator implements sophisticated rate limiting:
52
53
```java
54
// Rate calculation for different throughput ranges
55
if (elementsPerSecond >= 100) {
56
// High throughput: batch processing every 50ms
57
this.sleepBatchSize = elementsPerSecond / 20; // Elements per 50ms batch
58
this.sleepBatchTime = 50; // 50ms batches
59
} else if (elementsPerSecond >= 1) {
60
// Low throughput: per-element delays
61
this.sleepBatchSize = 1; // One element at a time
62
this.sleepBatchTime = 1000 / elementsPerSecond; // Delay per element
63
} else {
64
throw new IllegalArgumentException("Elements per second must be positive and not zero");
65
}
66
```
67
68
### Timing Control
69
70
```java
71
@Override
72
public T next() {
73
// Apply rate limiting delay if necessary
74
if (lastBatchCheckTime > 0) {
75
if (++num >= sleepBatchSize) {
76
num = 0;
77
78
final long now = System.currentTimeMillis();
79
final long elapsed = now - lastBatchCheckTime;
80
81
if (elapsed < sleepBatchTime) {
82
try {
83
Thread.sleep(sleepBatchTime - elapsed);
84
} catch (InterruptedException e) {
85
Thread.currentThread().interrupt();
86
}
87
}
88
lastBatchCheckTime = now;
89
}
90
} else {
91
lastBatchCheckTime = System.currentTimeMillis();
92
}
93
94
return source.next();
95
}
96
```
97
98
## Usage Patterns
99
100
### Stream Source Rate Control
101
102
```java
103
// Control emission rate of sample data
104
Iterator<String> sampleData = Arrays.asList("data1", "data2", "data3").iterator();
105
ThrottledIterator<String> throttledData = new ThrottledIterator<>(sampleData, 10); // 10 elements/sec
106
107
DataStream<String> stream = env.addSource(new IteratorSourceFunction<>(throttledData));
108
```
109
110
### Join Example Integration
111
112
```java
113
// Throttled data sources for window join examples
114
DataStream<Tuple3<String, String, Long>> orangeStream = env
115
.addSource(new ThrottledIterator<>(
116
OrangeSourceData.ORANGE_DATA.iterator(),
117
elementsPerSecond
118
))
119
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
120
121
DataStream<Tuple3<String, String, Long>> greenStream = env
122
.addSource(new ThrottledIterator<>(
123
GreenSourceData.GREEN_DATA.iterator(),
124
elementsPerSecond
125
))
126
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
127
```
128
129
### Load Testing Applications
130
131
```java
132
// Generate controlled load for performance testing
133
List<TestEvent> testData = generateTestData(10000);
134
ThrottledIterator<TestEvent> loadGenerator = new ThrottledIterator<>(
135
testData.iterator(),
136
1000 // 1000 events per second
137
);
138
139
DataStream<TestEvent> loadStream = env.addSource(
140
new IteratorSourceFunction<>(loadGenerator)
141
);
142
```
143
144
## Configuration Examples
145
146
### High Throughput Configuration
147
148
```java
149
// High throughput: 1000 elements/second
150
// Batches of 50 elements every 50ms
151
ThrottledIterator<T> highThroughput = new ThrottledIterator<>(source, 1000);
152
153
// Results in:
154
// sleepBatchSize = 50 (1000/20)
155
// sleepBatchTime = 50ms
156
// Emits 50 elements, then sleeps for remaining time in 50ms window
157
```
158
159
### Low Throughput Configuration
160
161
```java
162
// Low throughput: 2 elements/second
163
// Individual element delays of 500ms
164
ThrottledIterator<T> lowThroughput = new ThrottledIterator<>(source, 2);
165
166
// Results in:
167
// sleepBatchSize = 1
168
// sleepBatchTime = 500ms (1000/2)
169
// Emits 1 element, then sleeps for 500ms
170
```
171
172
### Testing Scenarios
173
174
```java
175
// Burst testing: No throttling
176
ThrottledIterator<T> burstMode = new ThrottledIterator<>(source, Long.MAX_VALUE);
177
178
// Trickle mode: Very slow emission
179
ThrottledIterator<T> trickleMode = new ThrottledIterator<>(source, 1);
180
181
// Realistic rate: Moderate throughput
182
ThrottledIterator<T> realisticRate = new ThrottledIterator<>(source, 100);
183
```
184
185
## Error Handling
186
187
### Source Validation
188
189
```java
190
public ThrottledIterator(Iterator<T> source, long elementsPerSecond) {
191
this.source = requireNonNull(source);
192
193
// Ensure source is serializable for Flink
194
if (!(source instanceof Serializable)) {
195
throw new IllegalArgumentException("source must be java.io.Serializable");
196
}
197
198
// Validate rate parameter
199
if (elementsPerSecond < 1) {
200
throw new IllegalArgumentException("'elements per second' must be positive and not zero");
201
}
202
203
// Configure rate limiting parameters...
204
}
205
```
206
207
### Interrupt Handling
208
209
```java
210
try {
211
Thread.sleep(sleepBatchTime - elapsed);
212
} catch (InterruptedException e) {
213
// Restore interrupt flag and proceed
214
Thread.currentThread().interrupt();
215
}
216
```
217
218
## Performance Characteristics
219
220
### Memory Usage
221
- **Minimal State**: Only tracks timing information and batch counters
222
- **No Buffering**: Wraps existing iterator without copying data
223
- **Serializable**: Can be distributed across Flink cluster nodes
224
225
### Accuracy
226
- **Millisecond Precision**: Uses `System.currentTimeMillis()` for timing
227
- **Batch Optimization**: Groups elements for better performance at high rates
228
- **Drift Correction**: Adjusts for processing time variations
229
230
### Throughput Ranges
231
- **High Rate (≥100/sec)**: Batch-based processing for efficiency
232
- **Low Rate (1-99/sec)**: Per-element timing for precision
233
- **Invalid Rate (<1/sec)**: Throws IllegalArgumentException
234
235
## Integration with Flink Sources
236
237
### Custom Source Function
238
239
```java
240
public class ThrottledSourceFunction<T> implements SourceFunction<T> {
241
private final ThrottledIterator<T> throttledIterator;
242
private volatile boolean isRunning = true;
243
244
public ThrottledSourceFunction(Iterator<T> source, long elementsPerSecond) {
245
this.throttledIterator = new ThrottledIterator<>(source, elementsPerSecond);
246
}
247
248
@Override
249
public void run(SourceContext<T> ctx) throws Exception {
250
while (isRunning && throttledIterator.hasNext()) {
251
synchronized (ctx.getCheckpointLock()) {
252
ctx.collect(throttledIterator.next());
253
}
254
}
255
}
256
257
@Override
258
public void cancel() {
259
isRunning = false;
260
}
261
}
262
```
263
264
### Source Builder Pattern
265
266
```java
267
public static <T> DataStream<T> createThrottledStream(
268
StreamExecutionEnvironment env,
269
Iterator<T> data,
270
long elementsPerSecond) {
271
272
return env.addSource(new ThrottledSourceFunction<>(data, elementsPerSecond));
273
}
274
275
// Usage
276
DataStream<String> throttledStream = createThrottledStream(
277
env,
278
sampleData.iterator(),
279
50 // 50 elements per second
280
);
281
```
282
283
## Related Utilities
284
285
While ThrottledIterator is the primary utility class in this package, it's commonly used with other data generation utilities:
286
287
### Sample Data Providers
288
- **WindowJoinSampleData**: Sample data for join examples
289
- **TwitterExampleData**: Sample tweet data for offline testing
290
- **TopSpeedWindowingExampleData**: Sample car telemetry data
291
- **SessionWindowingData**: Sample clickstream data
292
293
### Usage with Sample Data
294
295
```java
296
// Throttled join data
297
ThrottledIterator<Tuple3<String, String, Long>> orangeData =
298
new ThrottledIterator<>(
299
Arrays.asList(WindowJoinSampleData.ORANGE_DATA).iterator(),
300
elementsPerSecond
301
);
302
303
// Throttled windowing data
304
ThrottledIterator<Tuple4<Integer, Integer, Double, Long>> carData =
305
new ThrottledIterator<>(
306
Arrays.asList(TopSpeedWindowingExampleData.CAR_DATA).iterator(),
307
carsPerSecond
308
);
309
```
310
311
## Dependencies
312
313
```xml
314
<dependency>
315
<groupId>org.apache.flink</groupId>
316
<artifactId>flink-streaming-java_2.10</artifactId>
317
<version>1.3.3</version>
318
</dependency>
319
```
320
321
## Required Imports
322
323
```java
324
import java.io.Serializable;
325
import java.util.Iterator;
326
import static java.util.Objects.requireNonNull;
327
```