Comprehensive integration test suite for Apache Flink stream processing framework providing test utilities, base classes, and infrastructure for validating fault tolerance, checkpointing, and streaming operations.
Framework for verifying API parity between Java and Scala implementations using reflection-based method comparison. Essential for ensuring consistent API coverage across language bindings.
Abstract base class providing infrastructure for comparing Java and Scala API implementations using reflection.
public abstract class ScalaAPICompletenessTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(ScalaAPICompletenessTestBase.class);
// Core comparison methods
protected void compareApis(Class<?> javaClass, Class<?> scalaClass);
protected void compareApis(Class<?> javaClass, Class<?> scalaClass, Set<String> excludedMethods);
// Method filtering and exclusion
protected boolean isExcluded(Method method);
protected boolean isExcluded(String methodName);
protected Set<String> getExcludedMethods();
// Comparison utilities
protected boolean methodsMatch(Method javaMethod, Method scalaMethod);
protected String getMethodSignature(Method method);
protected void reportMissingMethods(Set<String> missingMethods, String apiType);
}Abstract base class for testing tuple comparator implementations between Java and Scala.
public abstract class TupleComparatorTestBase<T> {
protected TypeComparator<T> javaComparator;
protected TypeComparator<T> scalaComparator;
@Before
public abstract void setup();
// Comparison testing methods
protected void testComparatorEquality(T value1, T value2);
protected void testComparatorSerialization();
protected void testComparatorNormalization();
// Utility methods
protected abstract T[] getTestData();
protected abstract TypeComparator<T> createJavaComparator();
protected abstract TypeComparator<T> createScalaComparator();
}Abstract base class for testing pair comparator implementations.
public abstract class PairComparatorTestBase<T1, T2> {
protected TypePairComparator<T1, T2> javaPairComparator;
protected TypePairComparator<T1, T2> scalaPairComparator;
@Before
public abstract void setup();
// Pair comparison testing
protected void testPairComparatorEquality(T1 first1, T2 second1, T1 first2, T2 second2);
protected void testPairComparatorSerialization();
// Abstract methods for test data
protected abstract T1[] getFirstTypeTestData();
protected abstract T2[] getSecondTypeTestData();
protected abstract TypePairComparator<T1, T2> createJavaPairComparator();
protected abstract TypePairComparator<T1, T2> createScalaPairComparator();
}Utility for comparing method signatures between Java and Scala implementations.
public class MethodSignatureComparator {
public static boolean signaturesMatch(Method javaMethod, Method scalaMethod);
public static String normalizeSignature(Method method);
public static boolean returnTypesCompatible(Class<?> javaReturn, Class<?> scalaReturn);
public static boolean parametersCompatible(Class<?>[] javaParams, Class<?>[] scalaParams);
}Utilities for reflection-based testing and method analysis.
public class ReflectionTestUtils {
public static Set<Method> getPublicMethods(Class<?> clazz);
public static Set<Method> getPublicMethods(Class<?> clazz, Set<String> excludedNames);
public static boolean isMethodExcluded(Method method, Set<String> exclusions);
public static String getSimpleMethodSignature(Method method);
}public class DataSetAPICompletenessTest extends ScalaAPICompletenessTestBase {
@Test
public void testDataSetAPI() {
Class<?> javaDataSet = org.apache.flink.api.java.DataSet.class;
Class<?> scalaDataSet = org.apache.flink.api.scala.DataSet.class;
// Compare APIs with standard exclusions
compareApis(javaDataSet, scalaDataSet);
}
@Override
protected Set<String> getExcludedMethods() {
Set<String> excluded = new HashSet<>();
// Exclude Scala-specific methods
excluded.add("$plus$plus"); // Scala ++ operator
excluded.add("$colon$colon"); // Scala :: operator
// Exclude internal methods
excluded.add("clean");
excluded.add("getExecutionEnvironment");
return excluded;
}
@Override
protected boolean isExcluded(Method method) {
// Additional custom exclusion logic
String methodName = method.getName();
// Exclude synthetic methods
if (method.isSynthetic()) {
return true;
}
// Exclude bridge methods
if (method.isBridge()) {
return true;
}
// Exclude Scala-specific naming patterns
if (methodName.contains("$")) {
return true;
}
return super.isExcluded(method);
}
}public class Tuple2ComparatorTest extends TupleComparatorTestBase<Tuple2<String, Integer>> {
@Override
public void setup() {
javaComparator = createJavaComparator();
scalaComparator = createScalaComparator();
}
@Test
public void testTuple2Comparison() {
Tuple2<String, Integer> tuple1 = new Tuple2<>("hello", 42);
Tuple2<String, Integer> tuple2 = new Tuple2<>("world", 24);
testComparatorEquality(tuple1, tuple2);
testComparatorSerialization();
testComparatorNormalization();
}
@Override
protected Tuple2<String, Integer>[] getTestData() {
return new Tuple2[]{
new Tuple2<>("a", 1),
new Tuple2<>("b", 2),
new Tuple2<>("c", 3),
new Tuple2<>("a", 1), // Duplicate for equality testing
};
}
@Override
protected TypeComparator<Tuple2<String, Integer>> createJavaComparator() {
return new TupleComparator<>(
new int[]{0, 1}, // Key positions
new TypeComparator[]{
new StringComparator(true),
new IntComparator(true)
},
new TypeSerializer[]{
StringSerializer.INSTANCE,
IntSerializer.INSTANCE
}
);
}
@Override
protected TypeComparator<Tuple2<String, Integer>> createScalaComparator() {
// Create Scala equivalent comparator
return new ScalaTupleComparator<>(
new int[]{0, 1},
new TypeComparator[]{
new StringComparator(true),
new IntComparator(true)
}
);
}
}public class CustomAPICompletenessTest extends ScalaAPICompletenessTestBase {
@Test
public void testStreamingAPICompleteness() {
Class<?> javaDataStream = org.apache.flink.streaming.api.datastream.DataStream.class;
Class<?> scalaDataStream = org.apache.flink.streaming.api.scala.DataStream.class;
Set<String> customExclusions = new HashSet<>();
customExclusions.add("javaStream"); // Scala-specific bridge method
customExclusions.add("scalaStream"); // Java-specific bridge method
compareApis(javaDataStream, scalaDataStream, customExclusions);
}
@Override
protected void compareApis(Class<?> javaClass, Class<?> scalaClass, Set<String> excludedMethods) {
LOG.info("Comparing {} with {}", javaClass.getSimpleName(), scalaClass.getSimpleName());
Set<Method> javaMethods = getFilteredMethods(javaClass, excludedMethods);
Set<Method> scalaMethods = getFilteredMethods(scalaClass, excludedMethods);
// Find methods in Java but not in Scala
Set<String> javaSignatures = javaMethods.stream()
.map(this::getMethodSignature)
.collect(Collectors.toSet());
Set<String> scalaSignatures = scalaMethods.stream()
.map(this::getMethodSignature)
.collect(Collectors.toSet());
Set<String> missingInScala = new HashSet<>(javaSignatures);
missingInScala.removeAll(scalaSignatures);
Set<String> missingInJava = new HashSet<>(scalaSignatures);
missingInJava.removeAll(javaSignatures);
// Report findings
if (!missingInScala.isEmpty()) {
reportMissingMethods(missingInScala, "Scala API");
}
if (!missingInJava.isEmpty()) {
reportMissingMethods(missingInJava, "Java API");
}
// Assertions
assertTrue("Scala API missing methods: " + missingInScala, missingInScala.isEmpty());
assertTrue("Java API has extra methods: " + missingInJava, missingInJava.isEmpty());
LOG.info("API comparison successful: {} methods verified", javaSignatures.size());
}
private Set<Method> getFilteredMethods(Class<?> clazz, Set<String> excludedMethods) {
return ReflectionTestUtils.getPublicMethods(clazz, excludedMethods);
}
}public class OperatorAPICompletenessTest extends ScalaAPICompletenessTestBase {
@Test
public void testAllOperatorAPIs() {
// Test core transformation operators
testOperatorAPI("DataSet",
org.apache.flink.api.java.DataSet.class,
org.apache.flink.api.scala.DataSet.class);
testOperatorAPI("DataStream",
org.apache.flink.streaming.api.datastream.DataStream.class,
org.apache.flink.streaming.api.scala.DataStream.class);
testOperatorAPI("KeyedStream",
org.apache.flink.streaming.api.datastream.KeyedStream.class,
org.apache.flink.streaming.api.scala.KeyedStream.class);
}
private void testOperatorAPI(String apiName, Class<?> javaClass, Class<?> scalaClass) {
LOG.info("Testing {} API completeness", apiName);
try {
compareApis(javaClass, scalaClass);
LOG.info("{} API comparison successful", apiName);
} catch (AssertionError e) {
LOG.error("{} API comparison failed: {}", apiName, e.getMessage());
throw new AssertionError(apiName + " API completeness test failed", e);
}
}
@Override
protected Set<String> getExcludedMethods() {
Set<String> excluded = super.getExcludedMethods();
// Common exclusions across all operator APIs
excluded.add("returns"); // Type hint methods
excluded.add("getType");
excluded.add("getTransformation");
// Scala collection interop methods
excluded.add("asScala");
excluded.add("asJava");
return excluded;
}
}public class SerializationAPICompletenessTest extends ScalaAPICompletenessTestBase {
@Test
public void testSerializerAPIs() {
compareSerializerAPIs(
"Tuple2Serializer",
org.apache.flink.api.common.typeutils.base.TupleSerializer.class,
org.apache.flink.api.scala.typeutils.ScalaTupleSerializer.class
);
}
private void compareSerializerAPIs(String serializerName, Class<?> javaClass, Class<?> scalaClass) {
LOG.info("Comparing {} implementations", serializerName);
// Get serialization-specific methods
Set<Method> javaMethods = Arrays.stream(javaClass.getMethods())
.filter(m -> isSerializationMethod(m))
.collect(Collectors.toSet());
Set<Method> scalaMethods = Arrays.stream(scalaClass.getMethods())
.filter(m -> isSerializationMethod(m))
.collect(Collectors.toSet());
// Compare method signatures
compareMethodSets(javaMethods, scalaMethods, serializerName);
}
private boolean isSerializationMethod(Method method) {
String name = method.getName();
return name.equals("serialize") ||
name.equals("deserialize") ||
name.equals("getLength") ||
name.equals("copy");
}
private void compareMethodSets(Set<Method> javaMethods, Set<Method> scalaMethods, String context) {
Set<String> javaSignatures = javaMethods.stream()
.map(this::getMethodSignature)
.collect(Collectors.toSet());
Set<String> scalaSignatures = scalaMethods.stream()
.map(this::getMethodSignature)
.collect(Collectors.toSet());
assertEquals(context + " method signatures should match", javaSignatures, scalaSignatures);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests-2-10