or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md

data-collection.mddocs/

0

# Data Collection and Sources

1

2

Tools for creating controlled test data sources and collecting streaming results for validation in tests, including finite test sources with checkpoint synchronization and result collection utilities.

3

4

## Capabilities

5

6

### Finite Test Source

7

8

`FiniteTestSource` provides a controlled test source that emits elements in cycles with checkpoint synchronization, ideal for testing streaming applications with deterministic behavior.

9

10

```java { .api }

11

/**

12

* Test source that emits elements in cycles with checkpoint synchronization

13

* @param <T> Type of elements to emit

14

*/

15

public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {

16

/**

17

* Create source with vararg elements

18

* @param elements Elements to emit

19

*/

20

public FiniteTestSource(T... elements);

21

22

/**

23

* Create source with iterable elements

24

* @param elements Elements to emit

25

*/

26

public FiniteTestSource(Iterable<T> elements);

27

28

/**

29

* Create source with exit condition and timeout

30

* @param exitCondition Supplier that returns true when source should stop

31

* @param timeoutMs Timeout in milliseconds

32

* @param elements Elements to emit

33

*/

34

public FiniteTestSource(BooleanSupplier exitCondition, long timeoutMs, Iterable<T> elements);

35

36

/**

37

* Create source with exit condition

38

* @param exitCondition Supplier that returns true when source should stop

39

* @param elements Elements to emit

40

*/

41

public FiniteTestSource(BooleanSupplier exitCondition, Iterable<T> elements);

42

43

/**

44

* Main source execution method

45

* @param ctx Source context for emitting elements

46

*/

47

public void run(SourceContext<T> ctx) throws Exception;

48

49

/**

50

* Cancels the source

51

*/

52

public void cancel();

53

54

/**

55

* Checkpoint completion notification

56

* @param checkpointId ID of completed checkpoint

57

*/

58

public void notifyCheckpointComplete(long checkpointId) throws Exception;

59

60

/**

61

* Checkpoint abortion notification

62

* @param checkpointId ID of aborted checkpoint

63

*/

64

public void notifyCheckpointAborted(long checkpointId) throws Exception;

65

}

66

```

67

68

**Usage Examples:**

69

70

```java

71

import org.apache.flink.streaming.util.FiniteTestSource;

72

import org.apache.flink.streaming.api.datastream.DataStream;

73

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

74

75

@Test

76

public void testFiniteSource() throws Exception {

77

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

78

79

// Simple finite source with vararg elements

80

DataStream<String> stream1 = env.addSource(

81

new FiniteTestSource<>("hello", "world", "flink")

82

);

83

84

// Source with collection of elements

85

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

86

DataStream<Integer> stream2 = env.addSource(

87

new FiniteTestSource<>(numbers)

88

);

89

90

// Source with exit condition - stops after 10 seconds or when condition is met

91

AtomicBoolean shouldStop = new AtomicBoolean(false);

92

DataStream<String> stream3 = env.addSource(

93

new FiniteTestSource<>(

94

() -> shouldStop.get(),

95

10000L,

96

Arrays.asList("data1", "data2", "data3")

97

)

98

);

99

100

// Process streams

101

stream1.print("Stream1");

102

stream2.map(x -> x * 2).print("Stream2");

103

stream3.print("Stream3");

104

105

// In another thread, signal to stop after some processing

106

Timer timer = new Timer();

107

timer.schedule(new TimerTask() {

108

@Override

109

public void run() {

110

shouldStop.set(true);

111

}

112

}, 5000);

113

114

env.execute("Finite Source Test");

115

}

116

```

117

118

### Stream Collector

119

120

`StreamCollector` provides JUnit integration for collecting all elements from a DataStream for testing and validation.

121

122

```java { .api }

123

/**

124

* JUnit rule for collecting all elements from a DataStream for testing

125

*/

126

public class StreamCollector extends ExternalResource {

127

/**

128

* Collects all elements from the stream

129

* @param stream DataStream to collect from

130

* @return CompletableFuture that completes with collected elements

131

*/

132

public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream);

133

}

134

```

135

136

**Usage Example:**

137

138

```java

139

import org.apache.flink.streaming.util.StreamCollector;

140

import org.apache.flink.streaming.api.datastream.DataStream;

141

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

142

import org.junit.Rule;

143

144

public class StreamCollectionTest {

145

146

@Rule

147

public StreamCollector streamCollector = new StreamCollector();

148

149

@Test

150

public void testStreamCollection() throws Exception {

151

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

152

153

// Create test stream

154

DataStream<String> input = env.fromElements("apple", "banana", "cherry");

155

DataStream<String> processed = input.map(String::toUpperCase);

156

157

// Collect results

158

CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);

159

160

// Execute the job

161

env.execute("Stream Collection Test");

162

163

// Get results and validate

164

Collection<String> results = resultFuture.get(10, TimeUnit.SECONDS);

165

assertEquals(3, results.size());

166

assertTrue(results.contains("APPLE"));

167

assertTrue(results.contains("BANANA"));

168

assertTrue(results.contains("CHERRY"));

169

}

170

171

@Test

172

public void testStreamWithFiltering() throws Exception {

173

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

174

175

// Create test stream with filtering

176

DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

177

DataStream<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0);

178

179

// Collect filtered results

180

CompletableFuture<Collection<Integer>> resultFuture = streamCollector.collect(evenNumbers);

181

182

env.execute("Filtering Test");

183

184

// Validate filtered results

185

Collection<Integer> results = resultFuture.get();

186

assertEquals(5, results.size());

187

assertTrue(results.contains(2));

188

assertTrue(results.contains(4));

189

assertTrue(results.contains(6));

190

assertTrue(results.contains(8));

191

assertTrue(results.contains(10));

192

}

193

}

194

```

195

196

## Advanced Data Collection Patterns

197

198

### Testing with Multiple Sources

199

200

```java

201

@Test

202

public void testMultipleSources() throws Exception {

203

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

204

205

// Create multiple finite sources

206

DataStream<String> source1 = env.addSource(

207

new FiniteTestSource<>("a1", "a2", "a3")

208

);

209

210

DataStream<String> source2 = env.addSource(

211

new FiniteTestSource<>("b1", "b2", "b3")

212

);

213

214

// Union the sources

215

DataStream<String> combined = source1.union(source2);

216

DataStream<String> processed = combined.map(s -> "processed-" + s);

217

218

// Collect results

219

CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);

220

221

env.execute("Multiple Sources Test");

222

223

// Validate combined results

224

Collection<String> results = resultFuture.get();

225

assertEquals(6, results.size());

226

assertTrue(results.contains("processed-a1"));

227

assertTrue(results.contains("processed-b1"));

228

}

229

```

230

231

### Testing with Windowing

232

233

```java

234

import org.apache.flink.streaming.api.windowing.time.Time;

235

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

236

237

@Test

238

public void testWindowedStream() throws Exception {

239

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

240

241

// Create source with timestamped data

242

DataStream<Tuple2<String, Integer>> input = env.addSource(

243

new FiniteTestSource<>(

244

Tuple2.of("key1", 1),

245

Tuple2.of("key1", 2),

246

Tuple2.of("key2", 3),

247

Tuple2.of("key1", 4),

248

Tuple2.of("key2", 5)

249

)

250

);

251

252

// Apply windowing and aggregation

253

DataStream<Tuple2<String, Integer>> windowed = input

254

.keyBy(t -> t.f0)

255

.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))

256

.sum(1);

257

258

// Collect windowed results

259

CompletableFuture<Collection<Tuple2<String, Integer>>> resultFuture =

260

streamCollector.collect(windowed);

261

262

env.execute("Windowed Stream Test");

263

264

// Validate aggregated results

265

Collection<Tuple2<String, Integer>> results = resultFuture.get();

266

assertFalse(results.isEmpty());

267

268

// Check that aggregation occurred

269

boolean foundKey1Sum = results.stream()

270

.anyMatch(t -> "key1".equals(t.f0) && t.f1 > 1);

271

assertTrue("Expected aggregated key1 values", foundKey1Sum);

272

}

273

```

274

275

### Testing with Checkpointing

276

277

```java

278

@Test

279

public void testSourceWithCheckpointing() throws Exception {

280

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

281

282

// Enable checkpointing

283

env.enableCheckpointing(100);

284

285

// Create source that will emit elements and handle checkpoints

286

List<String> elements = Arrays.asList("checkpoint1", "checkpoint2", "checkpoint3");

287

AtomicInteger checkpointCount = new AtomicInteger(0);

288

289

FiniteTestSource<String> source = new FiniteTestSource<String>(elements) {

290

@Override

291

public void notifyCheckpointComplete(long checkpointId) throws Exception {

292

super.notifyCheckpointComplete(checkpointId);

293

checkpointCount.incrementAndGet();

294

}

295

};

296

297

DataStream<String> stream = env.addSource(source);

298

DataStream<String> processed = stream.map(s -> "checkpoint-processed-" + s);

299

300

// Collect results

301

CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(processed);

302

303

env.execute("Checkpointing Test");

304

305

// Validate results and checkpointing

306

Collection<String> results = resultFuture.get();

307

assertEquals(3, results.size());

308

assertTrue("Checkpoints should have been triggered", checkpointCount.get() > 0);

309

}

310

```

311

312

### Testing Source Cancellation

313

314

```java

315

@Test

316

public void testSourceCancellation() throws Exception {

317

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

318

319

// Create source with exit condition for controlled cancellation

320

AtomicBoolean cancelled = new AtomicBoolean(false);

321

List<String> elements = Arrays.asList("cancel1", "cancel2", "cancel3");

322

323

FiniteTestSource<String> source = new FiniteTestSource<String>(

324

() -> cancelled.get(),

325

5000L, // 5 second timeout

326

elements

327

) {

328

@Override

329

public void cancel() {

330

super.cancel();

331

cancelled.set(true);

332

}

333

};

334

335

DataStream<String> stream = env.addSource(source);

336

337

// Set up result collection

338

CompletableFuture<Collection<String>> resultFuture = streamCollector.collect(stream);

339

340

// Cancel after short delay

341

Timer timer = new Timer();

342

timer.scheduleAtFixedRate(new TimerTask() {

343

@Override

344

public void run() {

345

cancelled.set(true);

346

}

347

}, 1000, 100);

348

349

env.execute("Cancellation Test");

350

351

// Validate that source was properly cancelled

352

Collection<String> results = resultFuture.get();

353

assertTrue("Source should have been cancelled", cancelled.get());

354

}

355

```