or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md

test-data-providers.mddocs/

0

# Test Data Providers

1

2

Pre-built test datasets for common algorithms and use cases, providing consistent test data for PageRank, KMeans, WordCount, and other standard Flink examples with validation utilities.

3

4

## Capabilities

5

6

### Word Count Data

7

8

`WordCountData` provides test data for word counting algorithms with expected results.

9

10

```java { .api }

11

/**

12

* Test data for WordCount programs

13

*/

14

public class WordCountData {

15

/**

16

* Goethe Faust text for word counting

17

*/

18

public static final String TEXT;

19

20

/**

21

* Expected word counts

22

*/

23

public static final String COUNTS;

24

25

/**

26

* Expected streaming word count tuples

27

*/

28

public static final String STREAMING_COUNTS_AS_TUPLES;

29

30

/**

31

* Expected word count tuples

32

*/

33

public static final String COUNTS_AS_TUPLES;

34

}

35

```

36

37

**Usage Example:**

38

39

```java

40

import org.apache.flink.test.testdata.WordCountData;

41

import org.apache.flink.api.java.DataSet;

42

import org.apache.flink.api.java.ExecutionEnvironment;

43

44

@Test

45

public void testWordCount() throws Exception {

46

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

47

48

// Use test data

49

DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\\n"));

50

51

// Implement word count

52

DataSet<Tuple2<String, Integer>> counts = text

53

.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {

54

for (String word : line.toLowerCase().split("\\W+")) {

55

if (word.length() > 0) {

56

out.collect(new Tuple2<>(word, 1));

57

}

58

}

59

})

60

.groupBy(0)

61

.sum(1);

62

63

List<Tuple2<String, Integer>> result = counts.collect();

64

65

// Validate against expected results

66

String expectedCounts = WordCountData.COUNTS;

67

// Compare result with expectedCounts...

68

}

69

```

70

71

### K-Means Data

72

73

`KMeansData` provides test datasets for K-means clustering algorithms with multiple dimensional variants.

74

75

```java { .api }

76

/**

77

* Test data for KMeans programs

78

*/

79

public class KMeansData {

80

// 3D Data Constants

81

/**

82

* 3D data points for clustering

83

*/

84

public static final String DATAPOINTS;

85

86

/**

87

* Initial cluster centers for 3D data

88

*/

89

public static final String INITIAL_CENTERS;

90

91

/**

92

* Centers after one iteration for 3D data

93

*/

94

public static final String CENTERS_AFTER_ONE_STEP;

95

96

// Additional iteration results available...

97

98

// 2D Data Constants

99

/**

100

* 2D data points for clustering

101

*/

102

public static final String DATAPOINTS_2D;

103

104

/**

105

* Initial 2D cluster centers

106

*/

107

public static final String INITIAL_CENTERS_2D;

108

109

// Additional 2D iteration results available...

110

111

/**

112

* Validates K-means results with delta tolerance

113

* @param expectedResult Expected result string

114

* @param result Actual result list

115

* @param maxDelta Maximum allowed delta for floating point comparison

116

*/

117

public static void checkResultsWithDelta(

118

String expectedResult,

119

List<String> result,

120

double maxDelta

121

) throws Exception;

122

}

123

```

124

125

**Usage Example:**

126

127

```java

128

import org.apache.flink.test.testdata.KMeansData;

129

import org.apache.flink.api.java.DataSet;

130

import org.apache.flink.api.java.ExecutionEnvironment;

131

132

@Test

133

public void testKMeans() throws Exception {

134

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

135

136

// Load test data points

137

DataSet<String> dataPoints = env.fromElements(KMeansData.DATAPOINTS.split("\\n"));

138

DataSet<String> initialCenters = env.fromElements(KMeansData.INITIAL_CENTERS.split("\\n"));

139

140

// Parse data points into Point objects

141

DataSet<Point> points = dataPoints.map(line -> {

142

String[] coords = line.split(" ");

143

return new Point(

144

Double.parseDouble(coords[0]),

145

Double.parseDouble(coords[1]),

146

Double.parseDouble(coords[2])

147

);

148

});

149

150

DataSet<Centroid> centroids = initialCenters.map(line -> {

151

String[] coords = line.split(" ");

152

return new Centroid(

153

Integer.parseInt(coords[0]),

154

Double.parseDouble(coords[1]),

155

Double.parseDouble(coords[2]),

156

Double.parseDouble(coords[3])

157

);

158

});

159

160

// Run one iteration of K-means

161

DataSet<Centroid> newCentroids = points

162

.map(new SelectNearestCenter()).withBroadcastSet(centroids, "centroids")

163

.groupBy(0)

164

.reduceGroup(new CentroidAccumulator());

165

166

List<String> result = newCentroids.map(c -> c.toString()).collect();

167

168

// Validate with expected results and delta tolerance

169

KMeansData.checkResultsWithDelta(

170

KMeansData.CENTERS_AFTER_ONE_STEP,

171

result,

172

0.01

173

);

174

}

175

```

176

177

### PageRank Data

178

179

`PageRankData` provides test data for PageRank algorithms with graph structures and expected rankings.

180

181

```java { .api }

182

/**

183

* Test data for PageRank programs

184

*/

185

public class PageRankData {

186

/**

187

* Number of vertices in test graph

188

*/

189

public static final int NUM_VERTICES = 5;

190

191

/**

192

* Vertex data for PageRank

193

*/

194

public static final String VERTICES;

195

196

/**

197

* Edge data for PageRank graph

198

*/

199

public static final String EDGES;

200

201

/**

202

* Expected ranks after 3 iterations

203

*/

204

public static final String RANKS_AFTER_3_ITERATIONS;

205

206

/**

207

* Expected ranks after convergence with epsilon 0.0001

208

*/

209

public static final String RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;

210

}

211

```

212

213

**Usage Example:**

214

215

```java

216

import org.apache.flink.test.testdata.PageRankData;

217

import org.apache.flink.api.java.DataSet;

218

import org.apache.flink.api.java.ExecutionEnvironment;

219

220

@Test

221

public void testPageRank() throws Exception {

222

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

223

224

// Load vertices and edges

225

DataSet<String> verticesText = env.fromElements(PageRankData.VERTICES.split("\\n"));

226

DataSet<String> edgesText = env.fromElements(PageRankData.EDGES.split("\\n"));

227

228

// Parse into Vertex and Edge objects

229

DataSet<Tuple2<Long, Double>> vertices = verticesText.map(line -> {

230

String[] parts = line.split("\\s+");

231

return new Tuple2<>(Long.parseLong(parts[0]), Double.parseDouble(parts[1]));

232

});

233

234

DataSet<Tuple2<Long, Long>> edges = edgesText.map(line -> {

235

String[] parts = line.split("\\s+");

236

return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));

237

});

238

239

// Run PageRank for 3 iterations

240

DataSet<Tuple2<Long, Double>> ranks = runPageRank(vertices, edges, 3);

241

242

List<String> result = ranks.map(r -> r.f0 + " " + r.f1).collect();

243

244

// Validate against expected results

245

String expected = PageRankData.RANKS_AFTER_3_ITERATIONS;

246

// Compare results...

247

}

248

```

249

250

### Connected Components Data

251

252

`ConnectedComponentsData` provides test data for connected components algorithms.

253

254

```java { .api }

255

/**

256

* Test data for Connected Components programs

257

*/

258

public class ConnectedComponentsData {

259

/**

260

* Generates enumerated vertices

261

* @param num Number of vertices to generate

262

* @return String with vertex data

263

*/

264

public static String getEnumeratingVertices(int num);

265

266

/**

267

* Generates odd/even connected edges

268

* @param numVertices Number of vertices

269

* @param numEdges Number of edges to generate

270

* @param seed Random seed for reproducible results

271

* @return String with edge data

272

*/

273

public static String getRandomOddEvenEdges(int numVertices, int numEdges, long seed);

274

275

/**

276

* Validates odd/even connected component results

277

* @param result BufferedReader with results

278

*/

279

public static void checkOddEvenResult(BufferedReader result) throws Exception;

280

281

/**

282

* Validates tuple-based connected component results

283

* @param result List of component tuples

284

*/

285

public static void checkOddEvenResult(List<Tuple2<Long, Long>> result) throws Exception;

286

}

287

```

288

289

**Usage Example:**

290

291

```java

292

import org.apache.flink.test.testdata.ConnectedComponentsData;

293

294

@Test

295

public void testConnectedComponents() throws Exception {

296

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

297

298

// Generate test data

299

String verticesData = ConnectedComponentsData.getEnumeratingVertices(10);

300

String edgesData = ConnectedComponentsData.getRandomOddEvenEdges(10, 15, 12345L);

301

302

DataSet<String> verticesText = env.fromElements(verticesData.split("\\n"));

303

DataSet<String> edgesText = env.fromElements(edgesData.split("\\n"));

304

305

// Parse vertices and edges

306

DataSet<Tuple2<Long, Long>> vertices = verticesText.map(line -> {

307

String[] parts = line.split("\\s+");

308

return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[0]));

309

});

310

311

DataSet<Tuple2<Long, Long>> edges = edgesText.map(line -> {

312

String[] parts = line.split("\\s+");

313

return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));

314

});

315

316

// Run connected components algorithm

317

DataSet<Tuple2<Long, Long>> components = runConnectedComponents(vertices, edges);

318

319

List<Tuple2<Long, Long>> result = components.collect();

320

321

// Validate results

322

ConnectedComponentsData.checkOddEvenResult(result);

323

}

324

```

325

326

### Transitive Closure Data

327

328

`TransitiveClosureData` provides test data for transitive closure algorithms.

329

330

```java { .api }

331

/**

332

* Test data for Transitive Closure programs

333

*/

334

public class TransitiveClosureData {

335

/**

336

* Validates odd/even transitive closure results

337

* @param result BufferedReader with results

338

*/

339

public static void checkOddEvenResult(BufferedReader result) throws Exception;

340

}

341

```

342

343

### Web Log Analysis Data

344

345

`WebLogAnalysisData` provides test data for web log analysis programs.

346

347

```java { .api }

348

/**

349

* Test data for Web Log Analysis programs

350

*/

351

public class WebLogAnalysisData {

352

/**

353

* Document data for analysis

354

*/

355

public static final String DOCS;

356

357

/**

358

* Ranking data

359

*/

360

public static final String RANKS;

361

362

/**

363

* Visit log data

364

*/

365

public static final String VISITS;

366

367

/**

368

* Expected analysis results

369

*/

370

public static final String EXCEPTED_RESULT;

371

}

372

```

373

374

### Enum Triangle Data

375

376

`EnumTriangleData` provides test data for triangle enumeration algorithms.

377

378

```java { .api }

379

/**

380

* Test data for Enum Triangle programs

381

*/

382

public class EnumTriangleData {

383

/**

384

* Graph edges for triangle enumeration

385

*/

386

public static final String EDGES;

387

388

/**

389

* Expected triangles by ID

390

*/

391

public static final String TRIANGLES_BY_ID;

392

393

/**

394

* Expected triangles by degree

395

*/

396

public static final String TRIANGLES_BY_DEGREE;

397

}

398

```

399

400

## Advanced Data Provider Usage

401

402

### Custom Data Validation

403

404

```java

405

import org.apache.flink.test.util.TestBaseUtils;

406

407

@Test

408

public void testCustomDataValidation() throws Exception {

409

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

410

411

// Use multiple data providers

412

DataSet<String> wordCountInput = env.fromElements(WordCountData.TEXT.split("\\n"));

413

DataSet<String> kmeansInput = env.fromElements(KMeansData.DATAPOINTS.split("\\n"));

414

415

// Process data

416

DataSet<String> wordCounts = wordCountInput

417

.flatMap(new WordCountTokenizer())

418

.groupBy(0)

419

.sum(1)

420

.map(tuple -> tuple.f0 + " " + tuple.f1);

421

422

// Collect and validate using TestBaseUtils

423

List<String> results = wordCounts.collect();

424

425

// Use utility methods for validation

426

TestBaseUtils.compareResultAsText(results, WordCountData.COUNTS);

427

}

428

```

429

430

### Combining Multiple Test Datasets

431

432

```java

433

@Test

434

public void testMultipleDatasets() throws Exception {

435

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

436

437

// Combine different data sources

438

DataSet<String> combinedData = env.fromElements(

439

WordCountData.TEXT.split("\\n")

440

).union(

441

env.fromElements("Additional test data", "More test content")

442

);

443

444

// Process combined data

445

DataSet<Integer> wordLengths = combinedData

446

.flatMap((String line, Collector<String> out) -> {

447

for (String word : line.split("\\s+")) {

448

out.collect(word);

449

}

450

})

451

.map(word -> word.length());

452

453

List<Integer> results = wordLengths.collect();

454

assertFalse("Should have results", results.isEmpty());

455

}

456

```

457

458

### Testing with Expected Results Validation

459

460

```java

461

@Test

462

public void testWithExpectedResults() throws Exception {

463

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

464

465

// Use PageRank data

466

DataSet<String> vertices = env.fromElements(PageRankData.VERTICES.split("\\n"));

467

DataSet<String> edges = env.fromElements(PageRankData.EDGES.split("\\n"));

468

469

// Run algorithm and collect results

470

List<String> actualResults = runPageRankAlgorithm(vertices, edges, 3);

471

472

// Parse expected results for comparison

473

String[] expectedLines = PageRankData.RANKS_AFTER_3_ITERATIONS.split("\\n");

474

List<String> expectedResults = Arrays.asList(expectedLines);

475

476

// Use TestBaseUtils for comparison with tolerance

477

TestBaseUtils.compareResultCollections(

478

expectedResults,

479

actualResults,

480

new PageRankComparator(0.01) // Custom comparator with delta tolerance

481

);

482

}

483

```

484

485

### Performance Testing with Large Datasets

486

487

```java

488

@Test

489

public void testPerformanceWithLargeDataset() throws Exception {

490

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

491

env.setParallelism(4);

492

493

// Generate larger dataset based on KMeans data

494

List<String> largeDataset = new ArrayList<>();

495

String[] basePoints = KMeansData.DATAPOINTS.split("\\n");

496

497

// Replicate data points for performance testing

498

for (int i = 0; i < 1000; i++) {

499

for (String point : basePoints) {

500

largeDataset.add(point);

501

}

502

}

503

504

DataSet<String> largePointSet = env.fromCollection(largeDataset);

505

506

long startTime = System.currentTimeMillis();

507

508

// Process large dataset

509

List<String> results = largePointSet

510

.map(new PointProcessor())

511

.collect();

512

513

long endTime = System.currentTimeMillis();

514

long processingTime = endTime - startTime;

515

516

// Validate performance and results

517

assertFalse("Should have processed data", results.isEmpty());

518

assertTrue("Processing should complete within reasonable time",

519

processingTime < 30000); // 30 seconds

520

}

521

```