or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-testing.mdconnector-testing.mdcore-testing.mdindex.mdmigration-testing.mdtable-testing.mdtest-environments.md

test-environments.mddocs/

0

# Test Environments and Data Sources

1

2

Test execution environments, data sources, and utilities for creating controlled testing scenarios in Flink applications. These utilities enable comprehensive testing of streaming and batch applications with predictable test data and isolated execution contexts.

3

4

## Capabilities

5

6

### Test Environment Management

7

8

#### TestStreamEnvironment

9

10

Test-specific stream execution environment that provides controlled execution for testing streaming applications.

11

12

```java { .api }

13

/**

14

* Test-specific stream execution environment

15

* Extends StreamExecutionEnvironment with test-friendly defaults

16

*/

17

class TestStreamEnvironment extends StreamExecutionEnvironment {

18

/** Create a test stream environment with default parallelism of 1 */

19

static TestStreamEnvironment createTestEnvironment();

20

21

/** Create test environment with specific parallelism */

22

static TestStreamEnvironment createTestEnvironment(int parallelism);

23

}

24

```

25

26

#### MultipleProgramsTestBase

27

28

Base classes for tests that need to run multiple Flink programs in sequence.

29

30

```java { .api }

31

/**

32

* Base class for tests running multiple programs

33

* Provides clean execution environment for each program

34

*/

35

abstract class MultipleProgramsTestBase extends AbstractTestBase {

36

/** Get execution environment for current test */

37

ExecutionEnvironment getExecutionEnvironment();

38

39

/** Get stream execution environment for current test */

40

StreamExecutionEnvironment getStreamExecutionEnvironment();

41

}

42

43

/**

44

* JUnit 4 version of multiple programs test base

45

*/

46

abstract class MultipleProgramsTestBaseJUnit4 extends AbstractTestBaseJUnit4 {

47

ExecutionEnvironment getExecutionEnvironment();

48

StreamExecutionEnvironment getStreamExecutionEnvironment();

49

}

50

```

51

52

**Usage Examples:**

53

54

```java

55

import org.apache.flink.streaming.util.TestStreamEnvironment;

56

import org.apache.flink.test.util.MultipleProgramsTestBase;

57

58

// Using TestStreamEnvironment

59

TestStreamEnvironment env = TestStreamEnvironment.createTestEnvironment();

60

DataStream<String> source = env.fromElements("hello", "world");

61

// Configure and execute test pipeline

62

63

// Using MultipleProgramsTestBase

64

public class MyIntegrationTest extends MultipleProgramsTestBase {

65

@Test

66

public void testProgram1() {

67

StreamExecutionEnvironment env = getStreamExecutionEnvironment();

68

// Test first program

69

}

70

71

@Test

72

public void testProgram2() {

73

StreamExecutionEnvironment env = getStreamExecutionEnvironment();

74

// Test second program with fresh environment

75

}

76

}

77

```

78

79

### Test Data Sources

80

81

#### FiniteTestSource

82

83

Finite data source for streaming tests that emits a predetermined set of elements.

84

85

```java { .api }

86

/**

87

* Finite test data source for streaming tests

88

* Implements SourceFunction and CheckpointListener for complete lifecycle testing

89

*/

90

class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {

91

/** Create source with collection of elements */

92

FiniteTestSource(Collection<T> elements);

93

94

/** Create source with array of elements */

95

FiniteTestSource(T... elements);

96

97

/** Create source with elements and emission delay */

98

FiniteTestSource(Collection<T> elements, long delayBetweenElements);

99

100

void run(SourceContext<T> ctx) throws Exception;

101

void cancel();

102

void notifyCheckpointComplete(long checkpointId);

103

}

104

```

105

106

**Usage Examples:**

107

108

```java

109

import org.apache.flink.streaming.util.FiniteTestSource;

110

111

// Simple test source

112

FiniteTestSource<Integer> source = new FiniteTestSource<>(1, 2, 3, 4, 5);

113

DataStream<Integer> stream = env.addSource(source);

114

115

// Source with delay between elements

116

List<String> testData = Arrays.asList("a", "b", "c");

117

FiniteTestSource<String> delayedSource =

118

new FiniteTestSource<>(testData, 100); // 100ms between elements

119

DataStream<String> delayedStream = env.addSource(delayedSource);

120

```

121

122

### Test Data Sinks

123

124

#### TestListResultSink

125

126

Sink that collects streaming results in a list for easy verification in tests.

127

128

```java { .api }

129

/**

130

* Sink that collects results in a list for test verification

131

*/

132

class TestListResultSink<T> extends RichSinkFunction<T> {

133

/** Create sink with result collection */

134

TestListResultSink(List<T> resultList);

135

136

/** Create sink with thread-safe result collection */

137

static <T> TestListResultSink<T> createThreadSafe();

138

139

void invoke(T value, Context context);

140

141

/** Get collected results (thread-safe) */

142

List<T> getResults();

143

144

/** Clear collected results */

145

void clear();

146

}

147

```

148

149

**Usage Examples:**

150

151

```java

152

import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

153

154

// Collect results for verification

155

List<String> results = new ArrayList<>();

156

TestListResultSink<String> sink = new TestListResultSink<>(results);

157

158

DataStream<String> stream = env.fromElements("hello", "world");

159

stream.addSink(sink);

160

env.execute();

161

162

// Verify results

163

assertEquals(Arrays.asList("hello", "world"), results);

164

165

// Thread-safe version

166

TestListResultSink<Integer> threadSafeSink = TestListResultSink.createThreadSafe();

167

stream.addSink(threadSafeSink);

168

env.execute();

169

List<Integer> safeResults = threadSafeSink.getResults();

170

```

171

172

### Upsert Testing Framework

173

174

Specialized testing framework for upsert operations in streaming applications.

175

176

#### UpsertTestSink

177

178

Test sink specifically designed for testing upsert behavior in streaming applications.

179

180

```java { .api }

181

/**

182

* Test sink for upsert operations

183

* Tracks inserts, updates, and deletes separately

184

*/

185

class UpsertTestSink<IN> implements Sink<IN> {

186

/** Get all received records with their operation types */

187

List<UpsertRecord<IN>> getUpsertResults();

188

189

/** Get only insert records */

190

List<IN> getInsertResults();

191

192

/** Get only update records */

193

List<IN> getUpdateResults();

194

195

/** Get only delete records */

196

List<IN> getDeleteResults();

197

198

/** Clear all collected results */

199

void clearResults();

200

}

201

202

/**

203

* Builder for UpsertTestSink

204

*/

205

class UpsertTestSinkBuilder<IN> {

206

/** Set key fields for upsert operations */

207

UpsertTestSinkBuilder<IN> setKeyFields(String... keyFields);

208

209

/** Enable changelog mode tracking */

210

UpsertTestSinkBuilder<IN> enableChangelogMode();

211

212

/** Build the upsert test sink */

213

UpsertTestSink<IN> build();

214

}

215

216

/**

217

* Record in upsert sink with operation type

218

*/

219

class UpsertRecord<T> {

220

T getRecord();

221

UpsertOperation getOperation(); // INSERT, UPDATE, DELETE

222

long getTimestamp();

223

}

224

```

225

226

**Usage Examples:**

227

228

```java

229

import org.apache.flink.connector.upserttest.sink.UpsertTestSink;

230

import org.apache.flink.connector.upserttest.sink.UpsertTestSinkBuilder;

231

232

// Create upsert test sink

233

UpsertTestSink<MyRecord> upsertSink = new UpsertTestSinkBuilder<MyRecord>()

234

.setKeyFields("id")

235

.enableChangelogMode()

236

.build();

237

238

// Use in streaming pipeline

239

DataStream<MyRecord> changelogStream = // ... your upsert stream

240

changelogStream.sinkTo(upsertSink);

241

env.execute();

242

243

// Verify upsert behavior

244

List<MyRecord> inserts = upsertSink.getInsertResults();

245

List<MyRecord> updates = upsertSink.getUpdateResults();

246

List<UpsertRecord<MyRecord>> allChanges = upsertSink.getUpsertResults();

247

248

assertEquals(3, inserts.size());

249

assertEquals(2, updates.size());

250

```

251

252

### Test Data Generators

253

254

Pre-built test datasets for common algorithms and testing scenarios.

255

256

#### Algorithm Test Data

257

258

```java { .api }

259

/**

260

* Test data for connected components algorithm

261

*/

262

class ConnectedComponentsData {

263

/** Get default vertex data for testing */

264

static DataSet<Tuple2<Long, Long>> getDefaultVertexDataSet(ExecutionEnvironment env);

265

266

/** Get default edge data for testing */

267

static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);

268

269

/** Get expected results for default dataset */

270

static String getExpectedResult();

271

}

272

273

/**

274

* Test data for word count examples

275

*/

276

class WordCountData {

277

/** Get default text data for word count testing */

278

static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env);

279

280

/** Get expected word count results */

281

static String getExpectedResult();

282

}

283

284

/**

285

* Test data for K-means clustering

286

*/

287

class KMeansData {

288

/** Get default point data for clustering */

289

static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env);

290

291

/** Get default centroid data */

292

static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env);

293

294

/** Get expected clustering results */

295

static String getExpectedResult();

296

}

297

298

/**

299

* Test data for PageRank algorithm

300

*/

301

class PageRankData {

302

/** Get default vertices for PageRank testing */

303

static DataSet<Tuple2<Long, Double>> getDefaultVerticesDataSet(ExecutionEnvironment env);

304

305

/** Get default edges for PageRank testing */

306

static DataSet<Tuple2<Long, Long>> getDefaultEdgesDataSet(ExecutionEnvironment env);

307

308

/** Get expected PageRank results */

309

static String getExpectedResult();

310

}

311

```

312

313

**Usage Examples:**

314

315

```java

316

import org.apache.flink.test.testdata.WordCountData;

317

import org.apache.flink.test.testdata.ConnectedComponentsData;

318

319

// Word count test

320

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

321

DataSet<String> text = WordCountData.getDefaultTextLineDataSet(env);

322

// Run word count algorithm

323

String result = // ... algorithm result

324

assertEquals(WordCountData.getExpectedResult(), result);

325

326

// Connected components test

327

DataSet<Tuple2<Long, Long>> vertices =

328

ConnectedComponentsData.getDefaultVertexDataSet(env);

329

DataSet<Tuple2<Long, Long>> edges =

330

ConnectedComponentsData.getDefaultEdgeDataSet(env);

331

// Run connected components algorithm

332

```

333

334

### File and Process Utilities

335

336

#### TestBaseUtils and FileUtils

337

338

Utilities for file operations and process management in tests.

339

340

```java { .api }

341

/**

342

* Base utilities for Flink tests

343

*/

344

class TestBaseUtils {

345

/** Create temporary directory for test data */

346

static String createTempDirectory();

347

348

/** Clean up test resources */

349

static void cleanup();

350

351

/** Compare result files with expected output */

352

static void compareResultsByLinesInMemory(String expectedResult, String actualResult);

353

}

354

355

/**

356

* File utilities for testing

357

*/

358

class FileUtils {

359

/** Write string to temporary file */

360

static Path writeToTempFile(String content, String suffix);

361

362

/** Read entire file as string */

363

static String readFileAsString(Path file);

364

365

/** Create temporary directory */

366

static Path createTempDirectory(String prefix);

367

368

/** Delete directory recursively */

369

static void deleteDirectoryRecursively(Path directory);

370

}

371

372

/**

373

* Build and run test processes

374

*/

375

class TestProcessBuilder {

376

/** Create process builder for Java application */

377

static TestProcessBuilder createJavaProcess(Class<?> mainClass);

378

379

/** Add JVM arguments */

380

TestProcessBuilder addJvmArgs(String... args);

381

382

/** Add program arguments */

383

TestProcessBuilder addArgs(String... args);

384

385

/** Start the process and wait for completion */

386

ProcessResult start();

387

}

388

```

389

390

**Usage Examples:**

391

392

```java

393

import org.apache.flink.test.util.FileUtils;

394

import org.apache.flink.test.util.TestProcessBuilder;

395

396

// File operations

397

Path tempFile = FileUtils.writeToTempFile("test data", ".txt");

398

String content = FileUtils.readFileAsString(tempFile);

399

assertEquals("test data", content);

400

401

// Process testing

402

ProcessResult result = TestProcessBuilder

403

.createJavaProcess(MyFlinkJob.class)

404

.addJvmArgs("-Xmx1g")

405

.addArgs("--input", "test-input.txt")

406

.start();

407

408

assertEquals(0, result.getExitCode());

409

```