or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdrecovery-fault-tolerance.mdscala-testing.mdstate-migration.mdtest-base-classes.mdtest-utilities.md

index.mddocs/

0

# Apache Flink Tests

1

2

Apache Flink Tests provides comprehensive integration testing infrastructure for the Flink stream processing framework. This module contains test utilities, base classes, helper components, and testing patterns that enable thorough validation of Flink's streaming and batch processing capabilities, fault tolerance mechanisms, state management, and migration scenarios.

3

4

## Package Information

5

6

- **Package Name**: flink-tests_2.11

7

- **Package Type**: maven

8

- **Language**: Java with Scala support

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-tests_2.11

11

- **Version**: 1.5.1

12

- **Installation**: Add Maven dependency to your test scope

13

14

## Core Imports

15

16

```java

17

import org.apache.flink.test.util.TestUtils;

18

import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;

19

import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;

20

import org.apache.flink.test.cancelling.CancelingTestBase;

21

```

22

23

For Scala tests:

24

```scala

25

import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase

26

import org.apache.flink.api.scala.migration.MigrationTestTypes._

27

```

28

29

## Basic Usage

30

31

```java

32

import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;

33

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

34

35

public class MyFlinkTest extends StreamFaultToleranceTestBase {

36

37

@Override

38

public void testProgram(StreamExecutionEnvironment env) {

39

// Define your test streaming topology

40

env.fromElements(1, 2, 3, 4, 5)

41

.map(x -> x * 2)

42

.addSink(new TestListResultSink<>());

43

}

44

45

@Override

46

public void postSubmit() throws Exception {

47

// Add verification logic after execution

48

List<Integer> results = TestListResultSink.getResults();

49

assertEquals(Arrays.asList(2, 4, 6, 8, 10), results);

50

}

51

}

52

```

53

54

## Architecture

55

56

The flink-tests module is built around several key testing patterns:

57

58

- **Test Base Classes**: Abstract base classes providing standardized setup for different testing scenarios (fault tolerance, cancellation, migration, recovery)

59

- **Utility Components**: Helper classes for common testing operations (TestUtils, TestListResultSink, MigrationTestUtils)

60

- **Mock and Stub Components**: Testing implementations of Flink functions and operators for controlled testing scenarios

61

- **Configuration Infrastructure**: Standardized cluster setup and configuration management for reproducible test environments

62

- **Coordination Mechanisms**: File-based and accumulator-based coordination between test orchestration and job execution

63

64

This architecture enables comprehensive testing across all aspects of the Flink framework while providing reusable components for custom testing scenarios.

65

66

## Capabilities

67

68

### Test Base Classes

69

70

Foundation classes that provide standardized setup, execution patterns, and infrastructure for different types of Flink testing scenarios including fault tolerance, state migration, job cancellation, and recovery testing.

71

72

```java { .api }

73

public abstract class StreamFaultToleranceTestBase extends TestLogger {

74

public abstract void testProgram(StreamExecutionEnvironment env);

75

public abstract void postSubmit() throws Exception;

76

}

77

78

public abstract class SavepointMigrationTestBase extends TestBaseUtils {

79

protected final void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;

80

protected final void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;

81

}

82

83

public abstract class CancelingTestBase extends TestLogger {

84

protected void runAndCancelJob(Plan plan, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception;

85

}

86

```

87

88

[Test Base Classes](./test-base-classes.md)

89

90

### Test Utilities

91

92

Core utility classes and helper functions for common testing operations including execution handling, result collection, and test coordination mechanisms.

93

94

```java { .api }

95

public class TestUtils {

96

public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) throws Exception;

97

}

98

99

public class TestListResultSink<T> extends RichSinkFunction<T> {

100

public List<T> getResult();

101

public List<T> getSortedResult();

102

}

103

104

public class TestListWrapper {

105

public static int createList();

106

public static <T> List<T> getList(int listId);

107

}

108

```

109

110

[Test Utilities](./test-utilities.md)

111

112

### State Migration Testing

113

114

Comprehensive infrastructure for testing operator state migration and compatibility across Flink versions, including utilities for savepoint creation, restoration, and verification.

115

116

```java { .api }

117

public class MigrationTestUtils {

118

public static class CheckpointingNonParallelSourceWithListState implements SourceFunction<Tuple2<Long, Long>>, CheckpointedFunction;

119

public static class CheckingNonParallelSourceWithListState extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction;

120

public static class AccumulatorCountingSink<T> extends RichSinkFunction<T>;

121

}

122

123

public abstract class AbstractOperatorRestoreTestBase {

124

public abstract JobGraph createMigrationJob(StreamExecutionEnvironment env) throws Exception;

125

public abstract JobGraph createRestoredJob(StreamExecutionEnvironment env) throws Exception;

126

}

127

```

128

129

[State Migration Testing](./state-migration.md)

130

131

### Recovery and Fault Tolerance

132

133

Testing infrastructure for job recovery scenarios, restart strategies, failure simulation, and fault tolerance validation including TaskManager process failures.

134

135

```java { .api }

136

public abstract class SimpleRecoveryITCaseBase {

137

// Abstract methods for defining failing and successful execution plans

138

}

139

140

public abstract class AbstractTaskManagerProcessFailureRecoveryTest {

141

// Constants for file-based coordination

142

protected static final String READY_MARKER_FILE_PREFIX = "ready-";

143

protected static final String PROCEED_MARKER_FILE = "proceed";

144

protected static final String FINISH_MARKER_FILE_PREFIX = "finish-";

145

}

146

```

147

148

[Recovery and Fault Tolerance](./recovery-fault-tolerance.md)

149

150

### Scala Testing Support

151

152

Scala-specific testing components including API completeness validation, migration test types, and Scala-specific utility functions for comprehensive Scala API testing.

153

154

```scala { .api }

155

object MigrationTestTypes {

156

case class CustomCaseClass(a: String, b: Long)

157

case class CustomCaseClassWithNesting(a: Long, nested: CustomCaseClass)

158

object CustomEnum extends Enumeration {

159

val ONE, TWO, THREE = Value

160

}

161

}

162

163

abstract class ScalaAPICompletenessTestBase {

164

// Base class for testing Scala API completeness

165

}

166

```

167

168

[Scala Testing Support](./scala-testing.md)

169

170

## Constants and Configuration

171

172

### Standard Test Configuration

173

- **PARALLELISM**: 4 (most base classes)

174

- **HIGH_PARALLELISM**: 12 (StreamFaultToleranceTestBase)

175

- **NUM_TASK_MANAGERS**: 3 (StreamFaultToleranceTestBase)

176

- **NUM_TASK_SLOTS**: 4 (StreamFaultToleranceTestBase)

177

178

### Cluster Configuration

179

All base classes provide standardized MiniClusterResource configurations optimized for reliable testing environments with proper timeouts, memory allocation, and task distribution.

180

181

## Testing Categories

182

183

The flink-tests module enables comprehensive testing across these functional areas:

184

185

- **Broadcast Variables Testing** (`org.apache.flink.test.broadcastvars`)

186

- **Iterative Algorithm Testing** (`org.apache.flink.test.iterative`)

187

- **Streaming API Testing** (`org.apache.flink.test.streaming`)

188

- **State Management Testing** (`org.apache.flink.test.state`)

189

- **Runtime Testing** (`org.apache.flink.test.runtime`)

190

- **Recovery and Fault Tolerance** (`org.apache.flink.test.recovery`)

191

- **Operator Testing** (`org.apache.flink.test.operators`)

192

- **Checkpointing Testing** (`org.apache.flink.test.checkpointing`)

193

- **Windowing Testing** (`org.apache.flink.test.windowing`)