Framework for verifying API parity between Java and Scala implementations using reflection-based method comparison. Essential for ensuring consistent API coverage across language bindings.
Abstract base class providing infrastructure for comparing Java and Scala API implementations using reflection.
public abstract class ScalaAPICompletenessTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(ScalaAPICompletenessTestBase.class);
// Core comparison methods
protected void compareApis(Class<?> javaClass, Class<?> scalaClass);
protected void compareApis(Class<?> javaClass, Class<?> scalaClass, Set<String> excludedMethods);
// Method filtering and exclusion
protected boolean isExcluded(Method method);
protected boolean isExcluded(String methodName);
protected Set<String> getExcludedMethods();
// Comparison utilities
protected boolean methodsMatch(Method javaMethod, Method scalaMethod);
protected String getMethodSignature(Method method);
protected void reportMissingMethods(Set<String> missingMethods, String apiType);
}Abstract base class for testing tuple comparator implementations between Java and Scala.
public abstract class TupleComparatorTestBase<T> {
protected TypeComparator<T> javaComparator;
protected TypeComparator<T> scalaComparator;
@Before
public abstract void setup();
// Comparison testing methods
protected void testComparatorEquality(T value1, T value2);
protected void testComparatorSerialization();
protected void testComparatorNormalization();
// Utility methods
protected abstract T[] getTestData();
protected abstract TypeComparator<T> createJavaComparator();
protected abstract TypeComparator<T> createScalaComparator();
}Abstract base class for testing pair comparator implementations.
public abstract class PairComparatorTestBase<T1, T2> {
protected TypePairComparator<T1, T2> javaPairComparator;
protected TypePairComparator<T1, T2> scalaPairComparator;
@Before
public abstract void setup();
// Pair comparison testing
protected void testPairComparatorEquality(T1 first1, T2 second1, T1 first2, T2 second2);
protected void testPairComparatorSerialization();
// Abstract methods for test data
protected abstract T1[] getFirstTypeTestData();
protected abstract T2[] getSecondTypeTestData();
protected abstract TypePairComparator<T1, T2> createJavaPairComparator();
protected abstract TypePairComparator<T1, T2> createScalaPairComparator();
}Utility for comparing method signatures between Java and Scala implementations.
public class MethodSignatureComparator {
public static boolean signaturesMatch(Method javaMethod, Method scalaMethod);
public static String normalizeSignature(Method method);
public static boolean returnTypesCompatible(Class<?> javaReturn, Class<?> scalaReturn);
public static boolean parametersCompatible(Class<?>[] javaParams, Class<?>[] scalaParams);
}Utilities for reflection-based testing and method analysis.
public class ReflectionTestUtils {
public static Set<Method> getPublicMethods(Class<?> clazz);
public static Set<Method> getPublicMethods(Class<?> clazz, Set<String> excludedNames);
public static boolean isMethodExcluded(Method method, Set<String> exclusions);
public static String getSimpleMethodSignature(Method method);
}public class DataSetAPICompletenessTest extends ScalaAPICompletenessTestBase {
@Test
public void testDataSetAPI() {
Class<?> javaDataSet = org.apache.flink.api.java.DataSet.class;
Class<?> scalaDataSet = org.apache.flink.api.scala.DataSet.class;
// Compare APIs with standard exclusions
compareApis(javaDataSet, scalaDataSet);
}
@Override
protected Set<String> getExcludedMethods() {
Set<String> excluded = new HashSet<>();
// Exclude Scala-specific methods
excluded.add("$plus$plus"); // Scala ++ operator
excluded.add("$colon$colon"); // Scala :: operator
// Exclude internal methods
excluded.add("clean");
excluded.add("getExecutionEnvironment");
return excluded;
}
@Override
protected boolean isExcluded(Method method) {
// Additional custom exclusion logic
String methodName = method.getName();
// Exclude synthetic methods
if (method.isSynthetic()) {
return true;
}
// Exclude bridge methods
if (method.isBridge()) {
return true;
}
// Exclude Scala-specific naming patterns
if (methodName.contains("$")) {
return true;
}
return super.isExcluded(method);
}
}public class Tuple2ComparatorTest extends TupleComparatorTestBase<Tuple2<String, Integer>> {
@Override
public void setup() {
javaComparator = createJavaComparator();
scalaComparator = createScalaComparator();
}
@Test
public void testTuple2Comparison() {
Tuple2<String, Integer> tuple1 = new Tuple2<>("hello", 42);
Tuple2<String, Integer> tuple2 = new Tuple2<>("world", 24);
testComparatorEquality(tuple1, tuple2);
testComparatorSerialization();
testComparatorNormalization();
}
@Override
protected Tuple2<String, Integer>[] getTestData() {
return new Tuple2[]{
new Tuple2<>("a", 1),
new Tuple2<>("b", 2),
new Tuple2<>("c", 3),
new Tuple2<>("a", 1), // Duplicate for equality testing
};
}
@Override
protected TypeComparator<Tuple2<String, Integer>> createJavaComparator() {
return new TupleComparator<>(
new int[]{0, 1}, // Key positions
new TypeComparator[]{
new StringComparator(true),
new IntComparator(true)
},
new TypeSerializer[]{
StringSerializer.INSTANCE,
IntSerializer.INSTANCE
}
);
}
@Override
protected TypeComparator<Tuple2<String, Integer>> createScalaComparator() {
// Create Scala equivalent comparator
return new ScalaTupleComparator<>(
new int[]{0, 1},
new TypeComparator[]{
new StringComparator(true),
new IntComparator(true)
}
);
}
}public class CustomAPICompletenessTest extends ScalaAPICompletenessTestBase {
@Test
public void testStreamingAPICompleteness() {
Class<?> javaDataStream = org.apache.flink.streaming.api.datastream.DataStream.class;
Class<?> scalaDataStream = org.apache.flink.streaming.api.scala.DataStream.class;
Set<String> customExclusions = new HashSet<>();
customExclusions.add("javaStream"); // Scala-specific bridge method
customExclusions.add("scalaStream"); // Java-specific bridge method
compareApis(javaDataStream, scalaDataStream, customExclusions);
}
@Override
protected void compareApis(Class<?> javaClass, Class<?> scalaClass, Set<String> excludedMethods) {
LOG.info("Comparing {} with {}", javaClass.getSimpleName(), scalaClass.getSimpleName());
Set<Method> javaMethods = getFilteredMethods(javaClass, excludedMethods);
Set<Method> scalaMethods = getFilteredMethods(scalaClass, excludedMethods);
// Find methods in Java but not in Scala
Set<String> javaSignatures = javaMethods.stream()
.map(this::getMethodSignature)
.collect(Collectors.toSet());
Set<String> scalaSignatures = scalaMethods.stream()
.map(this::getMethodSignature)
.collect(Collectors.toSet());
Set<String> missingInScala = new HashSet<>(javaSignatures);
missingInScala.removeAll(scalaSignatures);
Set<String> missingInJava = new HashSet<>(scalaSignatures);
missingInJava.removeAll(javaSignatures);
// Report findings
if (!missingInScala.isEmpty()) {
reportMissingMethods(missingInScala, "Scala API");
}
if (!missingInJava.isEmpty()) {
reportMissingMethods(missingInJava, "Java API");
}
// Assertions
assertTrue("Scala API missing methods: " + missingInScala, missingInScala.isEmpty());
assertTrue("Java API has extra methods: " + missingInJava, missingInJava.isEmpty());
LOG.info("API comparison successful: {} methods verified", javaSignatures.size());
}
private Set<Method> getFilteredMethods(Class<?> clazz, Set<String> excludedMethods) {
return ReflectionTestUtils.getPublicMethods(clazz, excludedMethods);
}
}public class OperatorAPICompletenessTest extends ScalaAPICompletenessTestBase {
@Test
public void testAllOperatorAPIs() {
// Test core transformation operators
testOperatorAPI("DataSet",
org.apache.flink.api.java.DataSet.class,
org.apache.flink.api.scala.DataSet.class);
testOperatorAPI("DataStream",
org.apache.flink.streaming.api.datastream.DataStream.class,
org.apache.flink.streaming.api.scala.DataStream.class);
testOperatorAPI("KeyedStream",
org.apache.flink.streaming.api.datastream.KeyedStream.class,
org.apache.flink.streaming.api.scala.KeyedStream.class);
}
private void testOperatorAPI(String apiName, Class<?> javaClass, Class<?> scalaClass) {
LOG.info("Testing {} API completeness", apiName);
try {
compareApis(javaClass, scalaClass);
LOG.info("{} API comparison successful", apiName);
} catch (AssertionError e) {
LOG.error("{} API comparison failed: {}", apiName, e.getMessage());
throw new AssertionError(apiName + " API completeness test failed", e);
}
}
@Override
protected Set<String> getExcludedMethods() {
Set<String> excluded = super.getExcludedMethods();
// Common exclusions across all operator APIs
excluded.add("returns"); // Type hint methods
excluded.add("getType");
excluded.add("getTransformation");
// Scala collection interop methods
excluded.add("asScala");
excluded.add("asJava");
return excluded;
}
}public class SerializationAPICompletenessTest extends ScalaAPICompletenessTestBase {
@Test
public void testSerializerAPIs() {
compareSerializerAPIs(
"Tuple2Serializer",
org.apache.flink.api.common.typeutils.base.TupleSerializer.class,
org.apache.flink.api.scala.typeutils.ScalaTupleSerializer.class
);
}
private void compareSerializerAPIs(String serializerName, Class<?> javaClass, Class<?> scalaClass) {
LOG.info("Comparing {} implementations", serializerName);
// Get serialization-specific methods
Set<Method> javaMethods = Arrays.stream(javaClass.getMethods())
.filter(m -> isSerializationMethod(m))
.collect(Collectors.toSet());
Set<Method> scalaMethods = Arrays.stream(scalaClass.getMethods())
.filter(m -> isSerializationMethod(m))
.collect(Collectors.toSet());
// Compare method signatures
compareMethodSets(javaMethods, scalaMethods, serializerName);
}
private boolean isSerializationMethod(Method method) {
String name = method.getName();
return name.equals("serialize") ||
name.equals("deserialize") ||
name.equals("getLength") ||
name.equals("copy");
}
private void compareMethodSets(Set<Method> javaMethods, Set<Method> scalaMethods, String context) {
Set<String> javaSignatures = javaMethods.stream()
.map(this::getMethodSignature)
.collect(Collectors.toSet());
Set<String> scalaSignatures = scalaMethods.stream()
.map(this::getMethodSignature)
.collect(Collectors.toSet());
assertEquals(context + " method signatures should match", javaSignatures, scalaSignatures);
}
}