0
# Migration and Compatibility Testing
1
2
Utilities for testing state migration between Flink versions and ensuring compatibility across version upgrades. These tools enable comprehensive testing of application state evolution and backward compatibility.
3
4
## Capabilities
5
6
### Migration Test Framework
7
8
#### MigrationTest Interface
9
10
Marker interface for identifying migration-related tests.
11
12
```java { .api }
13
/**
14
* Marker interface for migration tests
15
* Used to categorize tests that verify version compatibility
16
*/
17
interface MigrationTest {
18
// Marker interface - no methods
19
}
20
```
21
22
**Usage Examples:**
23
24
```java
25
import org.apache.flink.test.util.MigrationTest;
26
27
public class MyStateMigrationTest implements MigrationTest {
28
@Test
29
public void testStateCompatibilityAcrossVersions() {
30
// Migration test implementation
31
}
32
}
33
```
34
35
### Snapshot Generation
36
37
#### MigrationTestsSnapshotGenerator
38
39
Utility for generating test snapshots that can be used for migration testing across Flink versions.
40
41
```java { .api }
42
/**
43
* Generate snapshots for migration testing
44
* Creates savepoints and checkpoints for compatibility testing
45
*/
46
class MigrationTestsSnapshotGenerator {
47
/** Generate snapshot for current version */
48
static void generateSnapshot(String snapshotPath, StreamExecutionEnvironment env) throws Exception;
49
50
/** Generate snapshot with specific configuration */
51
static void generateSnapshot(String snapshotPath, StreamExecutionEnvironment env,
52
Configuration config) throws Exception;
53
54
/** Validate snapshot format */
55
static boolean validateSnapshotFormat(String snapshotPath);
56
57
/** Get snapshot metadata */
58
static SnapshotMetadata getSnapshotMetadata(String snapshotPath);
59
}
60
61
/**
62
* Metadata about generated snapshots
63
*/
64
class SnapshotMetadata {
65
/** Get Flink version that created the snapshot */
66
String getFlinkVersion();
67
68
/** Get checkpoint/savepoint timestamp */
69
long getTimestamp();
70
71
/** Get state backend information */
72
String getStateBackend();
73
74
/** Get operator state information */
75
Map<String, OperatorState> getOperatorStates();
76
}
77
```
78
79
**Usage Examples:**
80
81
```java
82
import org.apache.flink.test.migration.MigrationTestsSnapshotGenerator;
83
84
// Generate snapshot for current version
85
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
86
// ... configure job with stateful operators
87
MigrationTestsSnapshotGenerator.generateSnapshot("/path/to/snapshot", env);
88
89
// Validate existing snapshot
90
boolean isValid = MigrationTestsSnapshotGenerator.validateSnapshotFormat("/path/to/old/snapshot");
91
assertTrue(isValid);
92
93
// Get snapshot information
94
SnapshotMetadata metadata = MigrationTestsSnapshotGenerator.getSnapshotMetadata("/path/to/snapshot");
95
System.out.println("Created by Flink version: " + metadata.getFlinkVersion());
96
```
97
98
### Version Utilities
99
100
#### PublishedVersionUtils
101
102
Utilities for working with published Flink versions in migration tests.
103
104
```java { .api }
105
/**
106
* Utilities for working with published Flink versions
107
* Helps manage version compatibility testing
108
*/
109
class PublishedVersionUtils {
110
/** Get list of all published Flink versions */
111
static List<String> getAllPublishedVersions();
112
113
/** Get versions compatible with current version */
114
static List<String> getCompatibleVersions();
115
116
/** Check if version supports specific feature */
117
static boolean supportsFeature(String version, String feature);
118
119
/** Get version comparison result */
120
static int compareVersions(String version1, String version2);
121
122
/** Parse version string into components */
123
static FlinkVersion parseVersion(String versionString);
124
}
125
126
/**
127
* Structured representation of Flink version
128
*/
129
class FlinkVersion implements Comparable<FlinkVersion> {
130
/** Get major version number */
131
int getMajor();
132
133
/** Get minor version number */
134
int getMinor();
135
136
/** Get patch version number */
137
int getPatch();
138
139
/** Get version string representation */
140
String toString();
141
142
/** Check if this version is newer than other */
143
boolean isNewerThan(FlinkVersion other);
144
145
int compareTo(FlinkVersion other);
146
}
147
```
148
149
### Snapshot Utilities
150
151
#### SnapshotGeneratorUtils
152
153
Low-level utilities for snapshot generation and manipulation.
154
155
```java { .api }
156
/**
157
* Utilities for snapshot generation and manipulation
158
* Provides fine-grained control over snapshot creation
159
*/
160
class SnapshotGeneratorUtils {
161
/** Create savepoint from running job */
162
static String createSavepoint(JobID jobId, String savepointPath, ClusterClient<?> client)
163
throws Exception;
164
165
/** Restore job from savepoint */
166
static JobID restoreFromSavepoint(JobGraph jobGraph, String savepointPath,
167
ClusterClient<?> client) throws Exception;
168
169
/** Convert checkpoint to savepoint */
170
static void convertCheckpointToSavepoint(String checkpointPath, String savepointPath)
171
throws Exception;
172
173
/** Validate savepoint compatibility */
174
static boolean isCompatible(String savepointPath, JobGraph jobGraph);
175
176
/** Extract operator state from snapshot */
177
static Map<String, byte[]> extractOperatorState(String snapshotPath, String operatorId)
178
throws Exception;
179
180
/** Create synthetic snapshot for testing */
181
static void createSyntheticSnapshot(String snapshotPath, Map<String, Object> stateData)
182
throws Exception;
183
}
184
```