or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md

index.mddocs/

0

# Flink Tests

1

2

The Flink Tests module provides comprehensive testing infrastructure and utilities for Apache Flink applications. This test-jar package offers data generators, test base classes, migration testing frameworks, and specialized utilities for testing stream processing applications with fault tolerance, state management, and performance validation.

3

4

## Package Information

5

6

- **Package Name**: flink-tests_2.11

7

- **Package Type**: maven

8

- **Language**: Java/Scala

9

- **Installation**:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-tests_2.11</artifactId>

14

<version>1.8.3</version>

15

<type>test-jar</type>

16

<scope>test</scope>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

// Test data generation

24

import org.apache.flink.test.operators.util.CollectionDataSets;

25

import org.apache.flink.test.operators.util.ValueCollectionDataSets;

26

27

// Input format utilities

28

import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;

29

import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;

30

import org.apache.flink.test.util.InfiniteIntegerInputFormat;

31

32

// Migration testing

33

import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;

34

import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;

35

36

// Streaming utilities and fault injection

37

import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

38

import org.apache.flink.test.streaming.runtime.util.TestListWrapper;

39

import org.apache.flink.test.checkpointing.utils.FailingSource;

40

import org.apache.flink.test.checkpointing.utils.ValidatingSink;

41

42

// Test base classes

43

import org.apache.flink.test.util.TestUtils;

44

import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;

45

import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;

46

47

// State management testing

48

import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;

49

import org.apache.flink.test.state.operator.restore.AbstractKeyedOperatorRestoreTestBase;

50

import org.apache.flink.test.state.operator.restore.AbstractNonKeyedOperatorRestoreTestBase;

51

52

// Test functions

53

import org.apache.flink.test.testfunctions.Tokenizer;

54

```

55

56

## Basic Usage

57

58

```java

59

import org.apache.flink.api.java.ExecutionEnvironment;

60

import org.apache.flink.api.java.DataSet;

61

import org.apache.flink.api.java.tuple.Tuple3;

62

import org.apache.flink.test.operators.util.CollectionDataSets;

63

64

// Create test data for batch processing tests

65

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

66

DataSet<Tuple3<Integer, Long, String>> testData = CollectionDataSets.get3TupleDataSet(env);

67

68

// Use the data in your test

69

testData.map(/* your transformation */).collect();

70

```

71

72

```java

73

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

74

import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

75

76

// Collect results in streaming tests

77

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

78

TestListResultSink<String> sink = new TestListResultSink<>();

79

80

dataStream.addSink(sink);

81

env.execute();

82

83

List<String> results = sink.getResult();

84

```

85

86

## Architecture

87

88

The Flink Tests module is organized around several key components:

89

90

- **Data Generation**: Standardized test datasets for various data types and structures

91

- **Migration Framework**: Complete infrastructure for testing savepoint and checkpoint migration across Flink versions

92

- **Streaming Utilities**: Thread-safe result collection and streaming-specific test helpers

93

- **Test Base Classes**: Abstract base classes providing common test patterns for different scenarios

94

- **Fault Tolerance Testing**: Specialized sources and sinks for testing failure scenarios and recovery

95

- **State Management Testing**: Framework for validating operator state restoration and migration

96

- **Performance Testing**: Manual programs for benchmarking and scalability testing

97

98

## Capabilities

99

100

### Test Data Generation

101

102

Comprehensive data generators providing standardized test datasets, input format utilities, and test functions for DataSet and DataStream operations. Includes tuple data, POJO data, nested structures, custom types, infinite data generators, and common transformation functions.

103

104

```java { .api }

105

// 3-tuple dataset with 21 records

106

public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);

107

108

// Custom POJO dataset with 21 records

109

public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);

110

111

// Small datasets for quick tests (3 records each)

112

public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);

113

114

// Input format utilities for continuous data generation

115

public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {

116

public UniformIntTupleGeneratorInputFormat(int numKeys, int numValsPerKey);

117

}

118

119

public class InfiniteIntegerInputFormat extends GenericInputFormat<Integer> {

120

public InfiniteIntegerInputFormat(boolean addDelay);

121

}

122

123

// Test functions for common transformations

124

public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

125

public void flatMap(String value, Collector<Tuple2<String, Integer>> out);

126

}

127

```

128

129

[Test Data Generation](./test-data-generation.md)

130

131

### Migration Testing Framework

132

133

Complete framework for testing savepoint and checkpoint migration across different Flink versions. Provides base classes, utilities, and pre-configured sources/sinks for migration validation.

134

135

```java { .api }

136

public abstract class SavepointMigrationTestBase extends TestBaseUtils {

137

protected String getResourceFilename(String filename);

138

protected void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath,

139

Tuple2<String, Integer>... expectedAccumulators) throws Exception;

140

protected void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath,

141

Tuple2<String, Integer>... expectedAccumulators) throws Exception;

142

}

143

```

144

145

[Migration Testing Framework](./migration-testing.md)

146

147

### Streaming Test Utilities

148

149

Thread-safe utilities for collecting and validating results in streaming applications. Includes result sinks, test wrappers, fault injection sources, and streaming-specific helper functions.

150

151

```java { .api }

152

public class TestListResultSink<T> extends RichSinkFunction<T> {

153

public List<T> getResult();

154

public List<T> getSortedResult();

155

}

156

157

public class TestListWrapper {

158

public static TestListWrapper getInstance();

159

public int createList();

160

public List<Object> getList(int listId);

161

}

162

163

public class FailingSource<T> extends RichSourceFunction<T> {

164

public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);

165

public static interface EventEmittingGenerator<T> extends Serializable {

166

public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);

167

}

168

}

169

170

public class ValidatingSink<T> extends RichSinkFunction<T> {

171

public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);

172

public static interface ResultChecker<T> extends Serializable {

173

public boolean checkResult(T result);

174

}

175

public static interface CountUpdater extends Serializable {

176

public void updateCount(long count);

177

}

178

}

179

```

180

181

[Streaming Test Utilities](./streaming-utilities.md)

182

183

### Test Base Classes

184

185

Abstract base classes providing common patterns for different testing scenarios including cancellation testing, fault tolerance testing, and recovery testing.

186

187

```java { .api }

188

public abstract class StreamFaultToleranceTestBase extends TestLogger {

189

public static final int NUM_TASK_MANAGERS = 2;

190

public static final int NUM_TASK_SLOTS = 8;

191

public static final int PARALLELISM = 4;

192

193

public abstract void testProgram(StreamExecutionEnvironment env);

194

public abstract void postSubmit() throws Exception;

195

}

196

197

public abstract class SimpleRecoveryITCaseBase extends TestLogger {

198

protected abstract void executeRecoveryTest() throws Exception;

199

}

200

```

201

202

[Test Base Classes](./test-base-classes.md)

203

204

### Class Loading Test Programs

205

206

Complete programs for testing dynamic class loading, user code isolation, and class loading policies. Each program serves as a standalone test case for different class loading scenarios.

207

208

```java { .api }

209

public class StreamingProgram {

210

public static void main(String[] args) throws Exception;

211

}

212

213

public class CheckpointedStreamingProgram {

214

public static void main(String[] args) throws Exception;

215

}

216

217

public class KMeansForTest {

218

public static void main(String[] args) throws Exception;

219

public static class Point { /* 2D point representation */ }

220

public static class Centroid extends Point { /* cluster center */ }

221

}

222

```

223

224

[Class Loading Test Programs](./class-loading-programs.md)

225

226

### State Management Testing

227

228

Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios.

229

230

```java { .api }

231

public enum ExecutionMode {

232

GENERATE, MIGRATE, RESTORE

233

}

234

235

public abstract class AbstractOperatorRestoreTestBase {

236

protected abstract StreamExecutionEnvironment createMigrationJob(String savepointPath) throws Exception;

237

protected abstract StreamExecutionEnvironment createRestoredJob(String savepointPath) throws Exception;

238

protected abstract String getMigrationSavepointName();

239

}

240

241

public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

242

// Specialized testing for keyed state operators

243

}

244

245

public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

246

// Specialized testing for non-keyed state operators

247

}

248

249

public class KeyedJob {

250

public static void main(String[] args) throws Exception;

251

}

252

253

public class NonKeyedJob {

254

public static void main(String[] args) throws Exception;

255

}

256

```

257

258

[State Management Testing](./state-management-testing.md)

259

260

### Performance Testing Programs

261

262

Manual programs for performance benchmarking, scalability testing, and resource usage validation. These programs are designed for manual execution and analysis.

263

264

```java { .api }

265

public class MassiveStringSorting {

266

public static void main(String[] args) throws Exception;

267

}

268

269

public class StreamingScalabilityAndLatency {

270

public static void main(String[] args) throws Exception;

271

}

272

273

public class ReducePerformance {

274

public static void main(String[] args) throws Exception;

275

}

276

```

277

278

[Performance Testing Programs](./performance-testing.md)

279

280

## Common Types

281

282

```java { .api }

283

// Test data POJO

284

public static class CustomType {

285

public int myInt;

286

public long myLong;

287

public String myString;

288

289

public CustomType();

290

public CustomType(int i, long l, String s);

291

}

292

293

// Success indication exception

294

public class SuccessException extends RuntimeException {

295

public SuccessException();

296

}

297

298

// Simple integer wrapper for testing

299

public class IntType {

300

public int value;

301

public IntType(int value);

302

}

303

304

// Test enum

305

public enum Category {

306

CAT_A, CAT_B

307

}

308

```

309

310

## Error Handling

311

312

The testing framework includes specialized exceptions and error handling patterns:

313

314

- **SuccessException**: RuntimeException thrown to indicate successful test completion in scenarios where normal completion is not expected

315

- **Fault injection**: Controlled failure injection through FailingSource and related utilities

316

- **Validation failures**: Clear error reporting through ValidatingSink and migration test utilities

317

- **Timeout handling**: Configurable timeouts for long-running test operations

318

319

Tests should handle these exceptions appropriately and use the provided error injection mechanisms for fault tolerance testing.