or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmetrics-testing.mdminicluster-management.mdresult-verification.mdspecialized-connectors.mdtest-data-sources.mdtest-environments.mdvalidation-utilities.md

test-data-sources.mddocs/

0

# Test Data Sources

1

2

Finite test sources for controlled data emission with checkpoint coordination and sample datasets for common algorithms like PageRank, K-means, and Connected Components. These utilities provide deterministic data sources for reliable testing scenarios.

3

4

## Capabilities

5

6

### Finite Test Source

7

8

Source function that emits a finite set of elements with coordinated checkpointing and configurable exit conditions, designed for deterministic testing scenarios.

9

10

```java { .api }

11

public class FiniteTestSource<T> implements SourceFunction<T> {

12

public FiniteTestSource(T... elements);

13

public FiniteTestSource(Iterable<T> elements);

14

public FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);

15

public FiniteTestSource(BooleanSupplier couldExit, long waitTimeOut, Iterable<T> elements);

16

}

17

```

18

19

#### Basic Usage

20

21

```java

22

import org.apache.flink.streaming.util.FiniteTestSource;

23

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

24

25

// Create source with varargs

26

FiniteTestSource<Integer> numberSource = new FiniteTestSource<>(1, 2, 3, 4, 5);

27

28

// Create source with collection

29

List<String> words = Arrays.asList("hello", "world", "flink");

30

FiniteTestSource<String> wordSource = new FiniteTestSource<>(words);

31

32

// Use in streaming job

33

StreamExecutionEnvironment env = getTestEnvironment();

34

env.addSource(numberSource)

35

.map(x -> x * 2)

36

.print();

37

```

38

39

#### Advanced Usage with Exit Conditions

40

41

```java

42

import java.util.function.BooleanSupplier;

43

44

// Create source with custom exit condition

45

BooleanSupplier exitCondition = () -> System.currentTimeMillis() > startTime + 5000;

46

List<String> data = Arrays.asList("a", "b", "c", "d", "e");

47

48

FiniteTestSource<String> conditionalSource =

49

new FiniteTestSource<>(exitCondition, data);

50

51

// Create source with timeout

52

FiniteTestSource<String> timedSource =

53

new FiniteTestSource<>(exitCondition, 10000L, data);

54

```

55

56

### Test Result Collection

57

58

Utilities for collecting and managing test results from streaming jobs with thread-safe collection mechanisms.

59

60

```java { .api }

61

public class TestListResultSink<T> extends RichSinkFunction<T> {

62

// Sink that collects results in a thread-safe list

63

}

64

65

public class TestListWrapper<T> {

66

// Wrapper for managing test result collections

67

}

68

```

69

70

#### Usage Example

71

72

```java

73

import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

74

75

// Create result sink

76

TestListResultSink<String> resultSink = new TestListResultSink<>();

77

78

// Use in streaming job

79

env.fromElements("a", "b", "c")

80

.map(String::toUpperCase)

81

.addSink(resultSink);

82

83

env.execute("Test Job");

84

85

// Retrieve results

86

List<String> results = resultSink.getResult();

87

assertEquals(Arrays.asList("A", "B", "C"), results);

88

```

89

90

## Sample Algorithm Data

91

92

Predefined datasets for testing common graph and machine learning algorithms, providing both input data and expected results for validation.

93

94

### Word Count Data

95

96

Sample text data and expected word count results for testing text processing algorithms.

97

98

```java { .api }

99

public class WordCountData {

100

public static final String TEXT;

101

public static final String EXPECTED_RESULT;

102

}

103

```

104

105

#### Usage Example

106

107

```java

108

import org.apache.flink.test.testdata.WordCountData;

109

110

// Use predefined text data

111

env.fromElements(WordCountData.TEXT.split("\\s+"))

112

.flatMap(new WordCountMapper())

113

.keyBy(0)

114

.sum(1)

115

.print();

116

117

// Verify against expected results

118

String actualResult = collectJobOutput();

119

TestBaseUtils.compareResultsByLinesInMemory(

120

WordCountData.EXPECTED_RESULT, actualResult);

121

```

122

123

### Connected Components Data

124

125

Graph data for testing connected components algorithms with vertices, edges, and expected component assignments.

126

127

```java { .api }

128

public class ConnectedComponentsData {

129

public static final String VERTEX_DATA;

130

public static final String EDGE_DATA;

131

public static final String EXPECTED_RESULT;

132

}

133

```

134

135

#### Usage Example

136

137

```java

138

import org.apache.flink.test.testdata.ConnectedComponentsData;

139

140

// Create graph from test data

141

DataSet<Vertex> vertices = env.fromCollection(parseVertices(ConnectedComponentsData.VERTEX_DATA));

142

DataSet<Edge> edges = env.fromCollection(parseEdges(ConnectedComponentsData.EDGE_DATA));

143

144

// Run connected components algorithm

145

DataSet<Vertex> result = vertices.runOperation(new ConnectedComponents<>(maxIterations))

146

.withBroadcastSet(edges, "edges");

147

148

// Verify results

149

compareWithExpected(result, ConnectedComponentsData.EXPECTED_RESULT);

150

```

151

152

### K-Means Clustering Data

153

154

Sample points and expected cluster assignments for testing K-means clustering implementations.

155

156

```java { .api }

157

public class KMeansData {

158

public static final String DATAPOINTS;

159

public static final String INITIAL_CENTROIDS;

160

public static final String EXPECTED_RESULT;

161

}

162

```

163

164

### PageRank Data

165

166

Web graph data with vertices, edges, and expected PageRank scores for testing PageRank algorithm implementations.

167

168

```java { .api }

169

public class PageRankData {

170

public static final String VERTICES;

171

public static final String EDGES;

172

public static final String EXPECTED_RESULT;

173

}

174

```

175

176

### Triangle Enumeration Data

177

178

Graph data for testing triangle enumeration algorithms in social network analysis.

179

180

```java { .api }

181

public class EnumTriangleData {

182

public static final String EDGES;

183

public static final String EXPECTED_RESULT;

184

}

185

```

186

187

### Transitive Closure Data

188

189

Graph data for testing transitive closure algorithms with expected reachability results.

190

191

```java { .api }

192

public class TransitiveClosureData {

193

public static final String EDGES;

194

public static final String EXPECTED_RESULT;

195

}

196

```

197

198

### Web Log Analysis Data

199

200

Sample web server log data and expected analysis results for testing log processing applications.

201

202

```java { .api }

203

public class WebLogAnalysisData {

204

public static final String LOG_DATA;

205

public static final String EXPECTED_RESULT;

206

}

207

```

208

209

## Usage Patterns

210

211

### Deterministic Testing

212

213

Using finite sources for predictable test execution with guaranteed completion.

214

215

```java

216

@Test

217

void testStreamingTransformation() throws Exception {

218

// Create deterministic source

219

List<Integer> inputData = Arrays.asList(1, 2, 3, 4, 5);

220

FiniteTestSource<Integer> source = new FiniteTestSource<>(inputData);

221

222

// Collect results

223

TestListResultSink<Integer> sink = new TestListResultSink<>();

224

225

// Build and execute job

226

env.addSource(source)

227

.map(x -> x * x)

228

.addSink(sink);

229

230

env.execute("Square Numbers");

231

232

// Verify results

233

List<Integer> expected = Arrays.asList(1, 4, 9, 16, 25);

234

assertEquals(expected, sink.getResult());

235

}

236

```

237

238

### Algorithm Validation

239

240

Using predefined datasets to validate algorithm implementations.

241

242

```java

243

@Test

244

void testPageRankAlgorithm() throws Exception {

245

// Use predefined PageRank test data

246

DataSet<Vertex> vertices = parseVertices(PageRankData.VERTICES);

247

DataSet<Edge> edges = parseEdges(PageRankData.EDGES);

248

249

// Run PageRank algorithm

250

DataSet<Vertex> result = runPageRank(vertices, edges, 10);

251

252

// Verify against expected results

253

List<String> actualResults = result.collect();

254

TestBaseUtils.compareResultAsText(actualResults, PageRankData.EXPECTED_RESULT);

255

}

256

```

257

258

### Timeout-Based Testing

259

260

Using sources with timeout conditions for testing time-sensitive scenarios.

261

262

```java

263

@Test

264

void testWithTimeout() throws Exception {

265

long timeoutMs = 5000L;

266

BooleanSupplier timeoutCondition = () ->

267

System.currentTimeMillis() > startTime + timeoutMs;

268

269

List<String> data = generateLargeDataset();

270

FiniteTestSource<String> source =

271

new FiniteTestSource<>(timeoutCondition, timeoutMs, data);

272

273

// Job should complete within timeout

274

env.addSource(source)

275

.map(String::toUpperCase)

276

.print();

277

278

JobExecutionResult result = env.execute("Timeout Test");

279

assertTrue(result.getNetRuntime() < timeoutMs);

280

}

281

```

282

283

### Checkpoint Integration

284

285

Using finite sources with checkpoint coordination for testing fault tolerance.

286

287

```java

288

@Test

289

void testCheckpointingWithFiniteSource() throws Exception {

290

// Enable checkpointing

291

env.enableCheckpointing(1000);

292

293

// Create source that coordinates with checkpoints

294

FiniteTestSource<Long> source = new FiniteTestSource<>(1L, 2L, 3L, 4L, 5L);

295

296

env.addSource(source)

297

.keyBy(x -> x % 2)

298

.map(new StatefulMapper())

299

.print();

300

301

env.execute("Checkpoint Test");

302

303

// Verify checkpoints were created

304

// Implementation depends on your checkpoint verification strategy

305

}

306

```