or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdexternal-systems.mdindex.mditeration.mdjoins.mdmachine-learning.mdside-output.mdsocket.mdutilities.mdwindowing.mdwordcount.md

utilities.mddocs/

0

# Utility Classes

1

2

Shared utility classes and data generators used across multiple examples. Provides common functionality for rate limiting, data generation, and stream control.

3

4

## Capabilities

5

6

### ThrottledIterator

7

8

Rate-limited iterator for controlling data emission speed in streaming examples.

9

10

```java { .api }

11

/**

12

* Iterator that supports throttling the emission rate

13

* Controls data flow rate for testing and demonstration purposes

14

* @param <T> Type of elements being iterated

15

*/

16

public class ThrottledIterator<T> implements Iterator<T>, Serializable {

17

18

/**

19

* Creates a throttled iterator with specified emission rate

20

* @param source Source iterator to wrap (must be Serializable)

21

* @param elementsPerSecond Maximum elements to emit per second

22

* @throws IllegalArgumentException if source is not Serializable or rate is invalid

23

*/

24

public ThrottledIterator(Iterator<T> source, long elementsPerSecond);

25

26

/**

27

* Checks if more elements are available

28

* @return true if source has more elements

29

*/

30

public boolean hasNext();

31

32

/**

33

* Returns next element with rate limiting applied

34

* Blocks if necessary to maintain specified emission rate

35

* @return Next element from source iterator

36

*/

37

public T next();

38

39

/**

40

* Remove operation is not supported

41

* @throws UnsupportedOperationException always

42

*/

43

public void remove();

44

}

45

```

46

47

## Rate Limiting Implementation

48

49

### Throttling Logic

50

51

The ThrottledIterator implements sophisticated rate limiting:

52

53

```java

54

// Rate calculation for different throughput ranges

55

if (elementsPerSecond >= 100) {

56

// High throughput: batch processing every 50ms

57

this.sleepBatchSize = elementsPerSecond / 20; // Elements per 50ms batch

58

this.sleepBatchTime = 50; // 50ms batches

59

} else if (elementsPerSecond >= 1) {

60

// Low throughput: per-element delays

61

this.sleepBatchSize = 1; // One element at a time

62

this.sleepBatchTime = 1000 / elementsPerSecond; // Delay per element

63

} else {

64

throw new IllegalArgumentException("Elements per second must be positive and not zero");

65

}

66

```

67

68

### Timing Control

69

70

```java

71

@Override

72

public T next() {

73

// Apply rate limiting delay if necessary

74

if (lastBatchCheckTime > 0) {

75

if (++num >= sleepBatchSize) {

76

num = 0;

77

78

final long now = System.currentTimeMillis();

79

final long elapsed = now - lastBatchCheckTime;

80

81

if (elapsed < sleepBatchTime) {

82

try {

83

Thread.sleep(sleepBatchTime - elapsed);

84

} catch (InterruptedException e) {

85

Thread.currentThread().interrupt();

86

}

87

}

88

lastBatchCheckTime = now;

89

}

90

} else {

91

lastBatchCheckTime = System.currentTimeMillis();

92

}

93

94

return source.next();

95

}

96

```

97

98

## Usage Patterns

99

100

### Stream Source Rate Control

101

102

```java

103

// Control emission rate of sample data

104

Iterator<String> sampleData = Arrays.asList("data1", "data2", "data3").iterator();

105

ThrottledIterator<String> throttledData = new ThrottledIterator<>(sampleData, 10); // 10 elements/sec

106

107

DataStream<String> stream = env.addSource(new IteratorSourceFunction<>(throttledData));

108

```

109

110

### Join Example Integration

111

112

```java

113

// Throttled data sources for window join examples

114

DataStream<Tuple3<String, String, Long>> orangeStream = env

115

.addSource(new ThrottledIterator<>(

116

OrangeSourceData.ORANGE_DATA.iterator(),

117

elementsPerSecond

118

))

119

.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

120

121

DataStream<Tuple3<String, String, Long>> greenStream = env

122

.addSource(new ThrottledIterator<>(

123

GreenSourceData.GREEN_DATA.iterator(),

124

elementsPerSecond

125

))

126

.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

127

```

128

129

### Load Testing Applications

130

131

```java

132

// Generate controlled load for performance testing

133

List<TestEvent> testData = generateTestData(10000);

134

ThrottledIterator<TestEvent> loadGenerator = new ThrottledIterator<>(

135

testData.iterator(),

136

1000 // 1000 events per second

137

);

138

139

DataStream<TestEvent> loadStream = env.addSource(

140

new IteratorSourceFunction<>(loadGenerator)

141

);

142

```

143

144

## Configuration Examples

145

146

### High Throughput Configuration

147

148

```java

149

// High throughput: 1000 elements/second

150

// Batches of 50 elements every 50ms

151

ThrottledIterator<T> highThroughput = new ThrottledIterator<>(source, 1000);

152

153

// Results in:

154

// sleepBatchSize = 50 (1000/20)

155

// sleepBatchTime = 50ms

156

// Emits 50 elements, then sleeps for remaining time in 50ms window

157

```

158

159

### Low Throughput Configuration

160

161

```java

162

// Low throughput: 2 elements/second

163

// Individual element delays of 500ms

164

ThrottledIterator<T> lowThroughput = new ThrottledIterator<>(source, 2);

165

166

// Results in:

167

// sleepBatchSize = 1

168

// sleepBatchTime = 500ms (1000/2)

169

// Emits 1 element, then sleeps for 500ms

170

```

171

172

### Testing Scenarios

173

174

```java

175

// Burst testing: No throttling

176

ThrottledIterator<T> burstMode = new ThrottledIterator<>(source, Long.MAX_VALUE);

177

178

// Trickle mode: Very slow emission

179

ThrottledIterator<T> trickleMode = new ThrottledIterator<>(source, 1);

180

181

// Realistic rate: Moderate throughput

182

ThrottledIterator<T> realisticRate = new ThrottledIterator<>(source, 100);

183

```

184

185

## Error Handling

186

187

### Source Validation

188

189

```java

190

public ThrottledIterator(Iterator<T> source, long elementsPerSecond) {

191

this.source = requireNonNull(source);

192

193

// Ensure source is serializable for Flink

194

if (!(source instanceof Serializable)) {

195

throw new IllegalArgumentException("source must be java.io.Serializable");

196

}

197

198

// Validate rate parameter

199

if (elementsPerSecond < 1) {

200

throw new IllegalArgumentException("'elements per second' must be positive and not zero");

201

}

202

203

// Configure rate limiting parameters...

204

}

205

```

206

207

### Interrupt Handling

208

209

```java

210

try {

211

Thread.sleep(sleepBatchTime - elapsed);

212

} catch (InterruptedException e) {

213

// Restore interrupt flag and proceed

214

Thread.currentThread().interrupt();

215

}

216

```

217

218

## Performance Characteristics

219

220

### Memory Usage

221

- **Minimal State**: Only tracks timing information and batch counters

222

- **No Buffering**: Wraps existing iterator without copying data

223

- **Serializable**: Can be distributed across Flink cluster nodes

224

225

### Accuracy

226

- **Millisecond Precision**: Uses `System.currentTimeMillis()` for timing

227

- **Batch Optimization**: Groups elements for better performance at high rates

228

- **Drift Correction**: Adjusts for processing time variations

229

230

### Throughput Ranges

231

- **High Rate (≥100/sec)**: Batch-based processing for efficiency

232

- **Low Rate (1-99/sec)**: Per-element timing for precision

233

- **Invalid Rate (<1/sec)**: Throws IllegalArgumentException

234

235

## Integration with Flink Sources

236

237

### Custom Source Function

238

239

```java

240

public class ThrottledSourceFunction<T> implements SourceFunction<T> {

241

private final ThrottledIterator<T> throttledIterator;

242

private volatile boolean isRunning = true;

243

244

public ThrottledSourceFunction(Iterator<T> source, long elementsPerSecond) {

245

this.throttledIterator = new ThrottledIterator<>(source, elementsPerSecond);

246

}

247

248

@Override

249

public void run(SourceContext<T> ctx) throws Exception {

250

while (isRunning && throttledIterator.hasNext()) {

251

synchronized (ctx.getCheckpointLock()) {

252

ctx.collect(throttledIterator.next());

253

}

254

}

255

}

256

257

@Override

258

public void cancel() {

259

isRunning = false;

260

}

261

}

262

```

263

264

### Source Builder Pattern

265

266

```java

267

public static <T> DataStream<T> createThrottledStream(

268

StreamExecutionEnvironment env,

269

Iterator<T> data,

270

long elementsPerSecond) {

271

272

return env.addSource(new ThrottledSourceFunction<>(data, elementsPerSecond));

273

}

274

275

// Usage

276

DataStream<String> throttledStream = createThrottledStream(

277

env,

278

sampleData.iterator(),

279

50 // 50 elements per second

280

);

281

```

282

283

## Related Utilities

284

285

While ThrottledIterator is the primary utility class in this package, it's commonly used with other data generation utilities:

286

287

### Sample Data Providers

288

- **WindowJoinSampleData**: Sample data for join examples

289

- **TwitterExampleData**: Sample tweet data for offline testing

290

- **TopSpeedWindowingExampleData**: Sample car telemetry data

291

- **SessionWindowingData**: Sample clickstream data

292

293

### Usage with Sample Data

294

295

```java

296

// Throttled join data

297

ThrottledIterator<Tuple3<String, String, Long>> orangeData =

298

new ThrottledIterator<>(

299

Arrays.asList(WindowJoinSampleData.ORANGE_DATA).iterator(),

300

elementsPerSecond

301

);

302

303

// Throttled windowing data

304

ThrottledIterator<Tuple4<Integer, Integer, Double, Long>> carData =

305

new ThrottledIterator<>(

306

Arrays.asList(TopSpeedWindowingExampleData.CAR_DATA).iterator(),

307

carsPerSecond

308

);

309

```

310

311

## Dependencies

312

313

```xml

314

<dependency>

315

<groupId>org.apache.flink</groupId>

316

<artifactId>flink-streaming-java_2.10</artifactId>

317

<version>1.3.3</version>

318

</dependency>

319

```

320

321

## Required Imports

322

323

```java

324

import java.io.Serializable;

325

import java.util.Iterator;

326

import static java.util.Objects.requireNonNull;

327

```