or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md

checkpointing-migration.mddocs/

0

# Checkpointing and Migration Testing

1

2

Comprehensive framework for testing snapshot migration across Flink versions with utilities for state validation and checkpoint management. This framework enables testing of state compatibility, migration correctness, and checkpoint recovery scenarios.

3

4

## Capabilities

5

6

### Snapshot Migration Test Base

7

8

Abstract base class for testing snapshot migration across Flink versions, providing structured approach to create snapshots in one version and restore them in another.

9

10

```java { .api }

11

/**

12

* Base class for testing snapshot migration across Flink versions

13

*/

14

public abstract class SnapshotMigrationTestBase {

15

/**

16

* Execute job and create snapshot for migration testing

17

* @param job JobGraph to execute and snapshot

18

* @return SnapshotSpec containing snapshot metadata

19

* @throws Exception if execution or snapshotting fails

20

*/

21

protected SnapshotSpec executeAndSnapshot(JobGraph job) throws Exception;

22

23

/**

24

* Restore from snapshot and execute job to validate migration

25

* @param job JobGraph to execute with restored state

26

* @param snapshot SnapshotSpec containing snapshot location and metadata

27

* @throws Exception if restore or execution fails

28

*/

29

protected void restoreAndExecute(JobGraph job, SnapshotSpec snapshot) throws Exception;

30

31

/**

32

* Snapshot specification containing metadata for migration testing

33

*/

34

public static class SnapshotSpec {

35

/**

36

* Get the filesystem path to the snapshot

37

* @return String path to snapshot directory

38

*/

39

public String getSnapshotPath();

40

41

/**

42

* Get the Flink version that created this snapshot

43

* @return String version identifier

44

*/

45

public String getSnapshotVersion();

46

}

47

}

48

```

49

50

### Migration Test Utilities

51

52

Comprehensive utilities for migration testing including sources, sinks, and operators with state management capabilities.

53

54

```java { .api }

55

/**

56

* Utility class providing components for migration testing scenarios

57

*/

58

public class MigrationTestUtils {

59

60

/**

61

* Source function with operator list state for migration testing

62

*/

63

public static class CheckpointingNonParallelSourceWithListState

64

implements SourceFunction<Integer> {

65

66

/**

67

* Constructor for checkpointing source with list state

68

* @param numElements number of elements to emit

69

*/

70

public CheckpointingNonParallelSourceWithListState(int numElements);

71

}

72

73

/**

74

* Source function for validating restored list state after migration

75

*/

76

public static class CheckingNonParallelSourceWithListState

77

implements SourceFunction<Integer> {

78

79

/**

80

* Constructor for validation source

81

* @param numElements number of elements for validation

82

*/

83

public CheckingNonParallelSourceWithListState(int numElements);

84

}

85

86

/**

87

* Parallel source with union list state for migration testing

88

*/

89

public static class CheckpointingParallelSourceWithUnionListState

90

implements SourceFunction<Integer> {

91

92

/**

93

* Constructor for parallel source with union state

94

* @param numElements elements per subtask

95

*/

96

public CheckpointingParallelSourceWithUnionListState(int numElements);

97

}

98

99

/**

100

* Parallel source for validating union list state after migration

101

*/

102

public static class CheckingParallelSourceWithUnionListState

103

implements SourceFunction<Integer> {

104

105

/**

106

* Constructor for parallel validation source

107

* @param numElements number of elements for validation

108

*/

109

public CheckingParallelSourceWithUnionListState(int numElements);

110

}

111

112

/**

113

* Sink that counts elements using Flink accumulators

114

*/

115

public static class AccumulatorCountingSink<T> implements SinkFunction<T> {

116

117

/**

118

* Constructor for accumulator counting sink

119

* @param accumulatorName name of the accumulator to use

120

*/

121

public AccumulatorCountingSink(String accumulatorName);

122

}

123

124

/**

125

* Source with configurable failure injection for testing recovery

126

*/

127

public static class FailingSource implements SourceFunction<Integer> {

128

129

/**

130

* Constructor for failing source

131

* @param failAfterElements number of elements before failure

132

* @param maxElements maximum elements to emit after recovery

133

*/

134

public FailingSource(int failAfterElements, int maxElements);

135

}

136

137

/**

138

* Source for testing job cancellation scenarios

139

*/

140

public static class CancellingIntegerSource implements SourceFunction<Integer> {

141

142

/**

143

* Constructor for cancelling source

144

* @param cancelAfterElements elements to emit before triggering cancellation

145

*/

146

public CancellingIntegerSource(int cancelAfterElements);

147

}

148

149

/**

150

* Sink that accumulates integer values for validation

151

*/

152

public static class AccumulatingIntegerSink implements SinkFunction<Integer> {

153

154

/**

155

* Constructor for accumulating sink

156

* @param outputList list to accumulate values into

157

*/

158

public AccumulatingIntegerSink(List<Integer> outputList);

159

}

160

161

/**

162

* Sink for validating output against expected values

163

*/

164

public static class ValidatingSink<T> implements SinkFunction<T> {

165

166

/**

167

* Constructor for validating sink

168

* @param expectedValues expected output values for validation

169

*/

170

public ValidatingSink(List<T> expectedValues);

171

}

172

}

173

```

174

175

**Usage Examples:**

176

177

```java

178

import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;

179

import org.apache.flink.test.checkpointing.utils.MigrationTestUtils.*;

180

181

// Basic migration test

182

public class StateMigrationTest extends SnapshotMigrationTestBase {

183

184

@Test

185

public void testListStateMigration() throws Exception {

186

// Create job with stateful source

187

JobGraph job = new JobGraph();

188

job.addVertex(new JobVertex("source",

189

new CheckpointingNonParallelSourceWithListState(100)));

190

job.addVertex(new JobVertex("sink",

191

new AccumulatorCountingSink<>("count")));

192

193

// Execute and create snapshot

194

SnapshotSpec snapshot = executeAndSnapshot(job);

195

196

// Create validation job

197

JobGraph validationJob = new JobGraph();

198

validationJob.addVertex(new JobVertex("validation-source",

199

new CheckingNonParallelSourceWithListState(

200

Arrays.asList(1, 2, 3, 4, 5))));

201

202

// Restore and validate

203

restoreAndExecute(validationJob, snapshot);

204

}

205

206

@Test

207

public void testUnionStateMigration() throws Exception {

208

// Test parallel source with union state

209

JobGraph job = new JobGraph();

210

JobVertex sourceVertex = new JobVertex("parallel-source",

211

new CheckpointingParallelSourceWithUnionListState(50));

212

sourceVertex.setParallelism(4);

213

job.addVertex(sourceVertex);

214

215

SnapshotSpec snapshot = executeAndSnapshot(job);

216

217

// Validation with different parallelism

218

JobGraph validationJob = new JobGraph();

219

JobVertex validationVertex = new JobVertex("validation-source",

220

new CheckingParallelSourceWithUnionListState(

221

expectedUnionState));

222

validationVertex.setParallelism(2);

223

validationJob.addVertex(validationVertex);

224

225

restoreAndExecute(validationJob, snapshot);

226

}

227

}

228

```