Comprehensive testing utilities for Apache Flink stream and batch processing applications
—
Utilities for POJO serialization verification, JAR packaging validation, resource discovery, and parameter property handling to ensure comprehensive testing coverage. These utilities provide validation capabilities for various aspects of Flink application development and deployment.
Utilities for validating that classes are properly serialized as POJOs by Flink's type system, ensuring optimal serialization performance.
@PublicEvolving
public class PojoTestUtils {
public static <T> void assertSerializedAsPojo(Class<T> clazz);
public static <T> void assertSerializedAsPojoWithoutKryo(Class<T> clazz);
}import org.apache.flink.types.PojoTestUtils;
// Test that a class is serialized as POJO
@Test
void testUserRecordIsPojo() {
PojoTestUtils.assertSerializedAsPojo(UserRecord.class);
}
// Test POJO serialization without Kryo fallback
@Test
void testStrictPojoSerialization() {
PojoTestUtils.assertSerializedAsPojoWithoutKryo(CustomerData.class);
}
// Example POJO class
public static class UserRecord {
public String name;
public int age;
public String email;
public UserRecord() {} // Required default constructor
public UserRecord(String name, int age, String email) {
this.name = name;
this.age = age;
this.email = email;
}
// Getters and setters for POJO compliance
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
}Utilities for verifying JAR file contents and structure, ensuring proper packaging for Flink applications and dependencies.
public class PackagingTestUtils {
public static void assertJarContainsOnlyFilesMatching(
Path jarPath, Collection<String> allowedPaths) throws IOException;
public static void assertJarContainsServiceEntry(
Path jarPath, Class<?> service) throws IOException;
}import org.apache.flink.packaging.PackagingTestUtils;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
@Test
void testJarPackaging() throws IOException {
Path jarPath = Paths.get("target/my-flink-app.jar");
// Verify JAR contains only allowed files
Collection<String> allowedPaths = Arrays.asList(
"com/mycompany/flink/.*",
"META-INF/.*",
"org/apache/flink/.*"
);
PackagingTestUtils.assertJarContainsOnlyFilesMatching(jarPath, allowedPaths);
// Verify JAR contains required service entries
PackagingTestUtils.assertJarContainsServiceEntry(
jarPath, org.apache.flink.table.factories.TableFactory.class);
}
@Test
void testConnectorJarStructure() throws IOException {
Path connectorJar = Paths.get("target/my-connector.jar");
// Validate connector JAR structure
Collection<String> connectorPaths = Arrays.asList(
"com/mycompany/connector/.*",
"META-INF/services/.*",
"META-INF/MANIFEST.MF"
);
PackagingTestUtils.assertJarContainsOnlyFilesMatching(connectorJar, connectorPaths);
// Verify service provider configuration
PackagingTestUtils.assertJarContainsServiceEntry(
connectorJar, org.apache.flink.table.factories.DynamicTableFactory.class);
}Utilities for finding and managing test resources using regex patterns, enabling flexible resource location in test environments.
public class ResourceTestUtils {
public static Path getResource(String resourceNameRegex) throws IOException;
}import org.apache.flink.test.resources.ResourceTestUtils;
import java.nio.file.Path;
@Test
void testResourceDiscovery() throws IOException {
// Find configuration file using regex
Path configFile = ResourceTestUtils.getResource(".*application\\.properties");
assertTrue(Files.exists(configFile));
// Find test data files
Path testDataFile = ResourceTestUtils.getResource(".*test-data\\.csv");
assertTrue(Files.exists(testDataFile));
// Use discovered resources in test
Properties config = new Properties();
try (InputStream is = Files.newInputStream(configFile)) {
config.load(is);
}
// Verify configuration
assertNotNull(config.getProperty("flink.parallelism"));
}
@Test
void testSchemaResourceLoading() throws IOException {
// Find Avro schema files
Path schemaFile = ResourceTestUtils.getResource(".*\\.avsc");
// Load and validate schema
Schema schema = new Schema.Parser().parse(Files.newInputStream(schemaFile));
assertNotNull(schema);
assertEquals(Schema.Type.RECORD, schema.getType());
}System property-based parameter management with type conversion and default value support for flexible test configuration.
public class ParameterProperty<V> {
public ParameterProperty(String propertyName, Function<String, V> converter);
public String getPropertyName();
public Optional<V> get();
public V get(V defaultValue);
}import org.apache.flink.test.parameters.ParameterProperty;
import java.util.function.Function;
// Define parameter properties with type conversion
ParameterProperty<Integer> parallelismProperty = new ParameterProperty<>(
"test.parallelism", Integer::parseInt);
ParameterProperty<String> jobNameProperty = new ParameterProperty<>(
"test.job.name", Function.identity());
ParameterProperty<Boolean> enableCheckpointingProperty = new ParameterProperty<>(
"test.checkpointing.enabled", Boolean::parseBoolean);
@Test
void testParameterConfiguration() {
// Use properties with defaults
int parallelism = parallelismProperty.get(4);
String jobName = jobNameProperty.get("DefaultTestJob");
boolean checkpointingEnabled = enableCheckpointingProperty.get(false);
// Configure test environment
StreamExecutionEnvironment env = getTestEnvironment();
env.setParallelism(parallelism);
if (checkpointingEnabled) {
env.enableCheckpointing(1000);
}
// Create job with configured parameters
env.fromElements(1, 2, 3, 4, 5)
.map(x -> x * parallelism)
.print(jobName);
}
@Test
void testOptionalParameters() {
// Check if parameters are provided
Optional<Integer> maxParallelism = parallelismProperty.get();
if (maxParallelism.isPresent()) {
// Use provided value
configureWithMaxParallelism(maxParallelism.get());
} else {
// Use automatic configuration
configureAutomatically();
}
}// Custom type conversion
ParameterProperty<Duration> timeoutProperty = new ParameterProperty<>(
"test.timeout", durationString -> Duration.parse(durationString));
ParameterProperty<List<String>> topicsProperty = new ParameterProperty<>(
"test.kafka.topics", topicsString -> Arrays.asList(topicsString.split(",")));
// Configuration class pattern
public class TestConfiguration {
private static final ParameterProperty<String> KAFKA_BOOTSTRAP_SERVERS =
new ParameterProperty<>("test.kafka.bootstrap.servers", Function.identity());
private static final ParameterProperty<Integer> KAFKA_PARTITIONS =
new ParameterProperty<>("test.kafka.partitions", Integer::parseInt);
public String getKafkaBootstrapServers() {
return KAFKA_BOOTSTRAP_SERVERS.get("localhost:9092");
}
public int getKafkaPartitions() {
return KAFKA_PARTITIONS.get(1);
}
}Complete validation suite for a Flink application including serialization, packaging, and configuration.
@Test
void testApplicationValidation() throws IOException {
// 1. Validate POJO serialization
PojoTestUtils.assertSerializedAsPojo(OrderRecord.class);
PojoTestUtils.assertSerializedAsPojo(CustomerRecord.class);
PojoTestUtils.assertSerializedAsPojo(ProductRecord.class);
// 2. Validate JAR packaging
Path applicationJar = Paths.get("target/order-processing-app.jar");
Collection<String> allowedPaths = Arrays.asList(
"com/mycompany/orders/.*",
"META-INF/.*",
"org/apache/flink/.*"
);
PackagingTestUtils.assertJarContainsOnlyFilesMatching(applicationJar, allowedPaths);
// 3. Validate resource availability
Path configFile = ResourceTestUtils.getResource(".*application\\.conf");
assertTrue(Files.exists(configFile));
// 4. Validate parameter configuration
TestConfiguration config = new TestConfiguration();
assertNotNull(config.getKafkaBootstrapServers());
assertTrue(config.getKafkaPartitions() > 0);
}Validation utilities designed for continuous integration and deployment pipelines.
@Test
void testCiCdValidation() throws IOException {
// Environment-specific parameter validation
ParameterProperty<String> environmentProperty = new ParameterProperty<>(
"deployment.environment", Function.identity());
String environment = environmentProperty.get("test");
// Validate based on environment
switch (environment) {
case "production":
validateProductionConfiguration();
break;
case "staging":
validateStagingConfiguration();
break;
default:
validateTestConfiguration();
}
// JAR validation for deployment
Path deploymentJar = ResourceTestUtils.getResource(".*-deployment\\.jar");
validateDeploymentJarStructure(deploymentJar);
}
private void validateProductionConfiguration() throws IOException {
// Production-specific validations
ParameterProperty<Boolean> debugEnabled = new ParameterProperty<>(
"flink.debug.enabled", Boolean::parseBoolean);
assertFalse(debugEnabled.get(false), "Debug should be disabled in production");
}Ensuring optimal serialization performance through POJO validation.
@Test
void testSerializationPerformance() {
// Test that critical data types are POJOs for optimal performance
PojoTestUtils.assertSerializedAsPojo(Event.class);
PojoTestUtils.assertSerializedAsPojo(Measurement.class);
PojoTestUtils.assertSerializedAsPojo(Alert.class);
// Ensure no Kryo fallback for performance-critical types
PojoTestUtils.assertSerializedAsPojoWithoutKryo(HighFrequencyEvent.class);
}
// Performance-critical event class
public static class HighFrequencyEvent {
public long timestamp;
public String sensorId;
public double value;
public String status;
// POJO requirements: default constructor and getters/setters
public HighFrequencyEvent() {}
// Constructor, getters, and setters...
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils