Test utilities for Apache Flink state migration tests with snapshot generators and utilities for testing backward compatibility across Flink versions.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-migration-test-utils@2.1.0Flink Migration Test Utils provides comprehensive testing utilities specifically designed for Apache Flink's state migration testing framework. It enables developers to generate and validate state snapshots across different Flink versions, ensuring backward compatibility and smooth migration paths for stateful streaming applications.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-migration-test-utils</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>import org.apache.flink.test.util.MigrationTest;
import org.apache.flink.test.migration.MigrationTestsSnapshotGenerator;
import org.apache.flink.test.migration.PublishedVersionUtils;
import org.apache.flink.FlinkVersion;import org.apache.flink.FlinkVersion;
import org.apache.flink.test.util.MigrationTest;
public class MyStateMigrationTest implements MigrationTest {
@SnapshotsGenerator
public void generateSnapshots(FlinkVersion version) throws Exception {
// Generate test snapshots for the given Flink version
// This method will be called automatically during snapshot generation
createAndSaveSnapshot(version);
}
private void createAndSaveSnapshot(FlinkVersion version) {
// Implementation to create state snapshots
// Save snapshots to test resources for migration testing
}
}java org.apache.flink.test.migration.MigrationTestsSnapshotGenerator \
--dir /path/to/project \
--version 2.1 \
--prefixes src/test/java,src/test/scala \
--classes com.example.MyMigrationTestCore interface that migration tests must implement to participate in automated snapshot generation.
public interface MigrationTest {
static FlinkVersion getMostRecentlyPublishedVersion();
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@interface SnapshotsGenerator {}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@interface ParameterizedSnapshotsGenerator {
String value();
}
}The MigrationTest interface provides the foundation for state migration testing:
getMostRecentlyPublishedVersion(): Static method that returns the most recently published Flink version@SnapshotsGenerator: Annotation to mark methods that generate snapshots. Marked methods must have signature void methodName(FlinkVersion version)@ParameterizedSnapshotsGenerator: Annotation for parameterized snapshot generation. Takes the name of a parameter method as its valueCommand-line tool for automated snapshot generation across migration test classes.
public class MigrationTestsSnapshotGenerator {
public static void main(String[] args) throws Throwable;
}Command Line Arguments:
--help: Display usage information--dir <directory>: Root directory for scanning (required)--version <version>: Target Flink version for snapshot generation (required, format: 1.17, v1.17, 1_17, v1_17)--prefixes <paths>: Comma-separated search paths (optional, default: "src/test/java,src/test/scala")--classes <classes>: Comma-separated qualified class names to process (optional, overrides prefixes)The tool automatically discovers migration test classes that:
MigrationTest interface*(Test|ITCase).(java|scala)Utility for retrieving published Flink version information.
public class PublishedVersionUtils {
public static FlinkVersion getMostRecentlyPublishedVersion();
}Reads the most recently published Flink version from the bundled resource file. Used internally by the MigrationTest interface and can be used directly in test implementations.
Internal utility class for executing snapshot generation methods. This class is used by MigrationTestsSnapshotGenerator to invoke the annotated methods in migration test classes.
class SnapshotGeneratorUtils {
static void executeGenerate(Class<?> migrationTestClass, FlinkVersion flinkVersion) throws Throwable;
}Note: This is an internal utility class and not intended for direct use by migration test implementers.
For tests that need to generate multiple snapshots with different parameters:
public class ParameterizedMigrationTest implements MigrationTest {
public Collection<TestConfiguration> generateTestConfigurations(FlinkVersion version) {
// Return different test configurations for the given version
return Arrays.asList(
new TestConfiguration("config1", someValue1),
new TestConfiguration("config2", someValue2)
);
}
@ParameterizedSnapshotsGenerator("generateTestConfigurations")
public void generateSnapshots(TestConfiguration config) throws Exception {
// Generate snapshots using the provided configuration
createSnapshotWithConfig(config);
}
}Add the generation profile to enable automated snapshot generation:
<profile>
<id>generate-migration-test-data</id>
<build>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>generate-migration-test-data</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<java classname="org.apache.flink.test.migration.MigrationTestsSnapshotGenerator"
fork="true" failonerror="true" dir="${project.basedir}">
<classpath refid="maven.test.classpath"/>
<arg value="--dir"/>
<arg line="${project.basedir}"/>
<arg value="--version"/>
<arg value="${generate.version}"/>
</java>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>Execute with:
mvn clean package -Pgenerate-migration-test-data -Dgenerate.version=2.1 -nsu -Dfast -DskipTests// From org.apache.flink.FlinkVersion (external dependency)
import java.util.Optional;
import java.util.Set;
public enum FlinkVersion {
v1_3("1.3"), v1_4("1.4"), v1_5("1.5"), v1_6("1.6"), v1_7("1.7"),
v1_8("1.8"), v1_9("1.9"), v1_10("1.10"), v1_11("1.11"), v1_12("1.12"),
v1_13("1.13"), v1_14("1.14"), v1_15("1.15"), v1_16("1.16"), v1_17("1.17"),
v1_18("1.18"), v1_19("1.19"), v1_20("1.20"), v2_0("2.0"), v2_1("2.1");
public static FlinkVersion valueOf(int majorVersion, int minorVersion);
public static Optional<FlinkVersion> byCode(String code);
public static FlinkVersion current();
public static Set<FlinkVersion> rangeOf(FlinkVersion start, FlinkVersion end);
public boolean isNewerVersionThan(FlinkVersion otherVersion);
public String toString();
}@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface SnapshotsGenerator {
// Marks methods for simple snapshot generation
// Method signature: void methodName(FlinkVersion version)
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface ParameterizedSnapshotsGenerator {
String value(); // Name of method that provides parameters
// Method signature: void methodName(T parameter)
// Parameter method signature: Collection<T> methodName(FlinkVersion version)
}This library requires the following compile-time dependencies:
org.apache.flink:flink-annotationsorg.apache.flink:flink-test-utils-junitorg.junit.vintage:junit-vintage-enginecommons-cli:commons-cliThe library is designed to be used as a test-scoped dependency and integrates with Flink's testing framework for state migration validation across version upgrades.
The migration test framework provides comprehensive error handling:
FileNotFoundException: Thrown when the specified root directory does not existIllegalArgumentException: Thrown when version format cannot be parsed (must match pattern v?([0-9]+)[._]([0-9]+))ClassNotFoundException: Logged as warning when specified class names cannot be found on classpathRuntimeException: Thrown when:
Version strings must match the regex pattern v?([0-9]+)[._]([0-9]+). Valid examples:
1.17v1.171_17v1_17Invalid examples:
1.17.0 (patch versions not supported)latest (symbolic names not supported)1.17-SNAPSHOT (qualifiers not supported)