CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils

Comprehensive testing utilities for Apache Flink stream and batch processing applications

Pending
Overview
Eval results
Files

validation-utilities.mddocs/

Validation Utilities

Utilities for POJO serialization verification, JAR packaging validation, resource discovery, and parameter property handling to ensure comprehensive testing coverage. These utilities provide validation capabilities for various aspects of Flink application development and deployment.

Capabilities

POJO Serialization Testing

Utilities for validating that classes are properly serialized as POJOs by Flink's type system, ensuring optimal serialization performance.

@PublicEvolving
public class PojoTestUtils {
    public static <T> void assertSerializedAsPojo(Class<T> clazz);
    public static <T> void assertSerializedAsPojoWithoutKryo(Class<T> clazz);
}

Usage Example

import org.apache.flink.types.PojoTestUtils;

// Test that a class is serialized as POJO
@Test
void testUserRecordIsPojo() {
    PojoTestUtils.assertSerializedAsPojo(UserRecord.class);
}

// Test POJO serialization without Kryo fallback
@Test
void testStrictPojoSerialization() {
    PojoTestUtils.assertSerializedAsPojoWithoutKryo(CustomerData.class);
}

// Example POJO class
public static class UserRecord {
    public String name;
    public int age;
    public String email;
    
    public UserRecord() {} // Required default constructor
    
    public UserRecord(String name, int age, String email) {
        this.name = name;
        this.age = age;
        this.email = email;
    }
    
    // Getters and setters for POJO compliance
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    public int getAge() { return age; }
    public void setAge(int age) { this.age = age; }
    public String getEmail() { return email; }
    public void setEmail(String email) { this.email = email; }
}

JAR Packaging Validation

Utilities for verifying JAR file contents and structure, ensuring proper packaging for Flink applications and dependencies.

public class PackagingTestUtils {
    public static void assertJarContainsOnlyFilesMatching(
        Path jarPath, Collection<String> allowedPaths) throws IOException;
    public static void assertJarContainsServiceEntry(
        Path jarPath, Class<?> service) throws IOException;
}

Usage Example

import org.apache.flink.packaging.PackagingTestUtils;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;

@Test
void testJarPackaging() throws IOException {
    Path jarPath = Paths.get("target/my-flink-app.jar");
    
    // Verify JAR contains only allowed files
    Collection<String> allowedPaths = Arrays.asList(
        "com/mycompany/flink/.*",
        "META-INF/.*",
        "org/apache/flink/.*"
    );
    
    PackagingTestUtils.assertJarContainsOnlyFilesMatching(jarPath, allowedPaths);
    
    // Verify JAR contains required service entries
    PackagingTestUtils.assertJarContainsServiceEntry(
        jarPath, org.apache.flink.table.factories.TableFactory.class);
}

@Test
void testConnectorJarStructure() throws IOException {
    Path connectorJar = Paths.get("target/my-connector.jar");
    
    // Validate connector JAR structure
    Collection<String> connectorPaths = Arrays.asList(
        "com/mycompany/connector/.*",
        "META-INF/services/.*",
        "META-INF/MANIFEST.MF"
    );
    
    PackagingTestUtils.assertJarContainsOnlyFilesMatching(connectorJar, connectorPaths);
    
    // Verify service provider configuration
    PackagingTestUtils.assertJarContainsServiceEntry(
        connectorJar, org.apache.flink.table.factories.DynamicTableFactory.class);
}

Resource Discovery

Utilities for finding and managing test resources using regex patterns, enabling flexible resource location in test environments.

public class ResourceTestUtils {
    public static Path getResource(String resourceNameRegex) throws IOException;
}

Usage Example

import org.apache.flink.test.resources.ResourceTestUtils;
import java.nio.file.Path;

@Test
void testResourceDiscovery() throws IOException {
    // Find configuration file using regex
    Path configFile = ResourceTestUtils.getResource(".*application\\.properties");
    assertTrue(Files.exists(configFile));
    
    // Find test data files
    Path testDataFile = ResourceTestUtils.getResource(".*test-data\\.csv");
    assertTrue(Files.exists(testDataFile));
    
    // Use discovered resources in test
    Properties config = new Properties();
    try (InputStream is = Files.newInputStream(configFile)) {
        config.load(is);
    }
    
    // Verify configuration
    assertNotNull(config.getProperty("flink.parallelism"));
}

@Test 
void testSchemaResourceLoading() throws IOException {
    // Find Avro schema files
    Path schemaFile = ResourceTestUtils.getResource(".*\\.avsc");
    
    // Load and validate schema
    Schema schema = new Schema.Parser().parse(Files.newInputStream(schemaFile));
    assertNotNull(schema);
    assertEquals(Schema.Type.RECORD, schema.getType());
}

Parameter Property Management

System property-based parameter management with type conversion and default value support for flexible test configuration.

public class ParameterProperty<V> {
    public ParameterProperty(String propertyName, Function<String, V> converter);
    
    public String getPropertyName();
    public Optional<V> get();
    public V get(V defaultValue);
}

Usage Example

import org.apache.flink.test.parameters.ParameterProperty;
import java.util.function.Function;

// Define parameter properties with type conversion
ParameterProperty<Integer> parallelismProperty = new ParameterProperty<>(
    "test.parallelism", Integer::parseInt);

ParameterProperty<String> jobNameProperty = new ParameterProperty<>(
    "test.job.name", Function.identity());

ParameterProperty<Boolean> enableCheckpointingProperty = new ParameterProperty<>(
    "test.checkpointing.enabled", Boolean::parseBoolean);

@Test
void testParameterConfiguration() {
    // Use properties with defaults
    int parallelism = parallelismProperty.get(4);
    String jobName = jobNameProperty.get("DefaultTestJob");
    boolean checkpointingEnabled = enableCheckpointingProperty.get(false);
    
    // Configure test environment
    StreamExecutionEnvironment env = getTestEnvironment();
    env.setParallelism(parallelism);
    
    if (checkpointingEnabled) {
        env.enableCheckpointing(1000);
    }
    
    // Create job with configured parameters
    env.fromElements(1, 2, 3, 4, 5)
       .map(x -> x * parallelism)
       .print(jobName);
}

@Test
void testOptionalParameters() {
    // Check if parameters are provided
    Optional<Integer> maxParallelism = parallelismProperty.get();
    
    if (maxParallelism.isPresent()) {
        // Use provided value
        configureWithMaxParallelism(maxParallelism.get());
    } else {
        // Use automatic configuration
        configureAutomatically();
    }
}

Advanced Parameter Usage

// Custom type conversion
ParameterProperty<Duration> timeoutProperty = new ParameterProperty<>(
    "test.timeout", durationString -> Duration.parse(durationString));

ParameterProperty<List<String>> topicsProperty = new ParameterProperty<>(
    "test.kafka.topics", topicsString -> Arrays.asList(topicsString.split(",")));

// Configuration class pattern
public class TestConfiguration {
    private static final ParameterProperty<String> KAFKA_BOOTSTRAP_SERVERS = 
        new ParameterProperty<>("test.kafka.bootstrap.servers", Function.identity());
    
    private static final ParameterProperty<Integer> KAFKA_PARTITIONS = 
        new ParameterProperty<>("test.kafka.partitions", Integer::parseInt);
    
    public String getKafkaBootstrapServers() {
        return KAFKA_BOOTSTRAP_SERVERS.get("localhost:9092");
    }
    
    public int getKafkaPartitions() {
        return KAFKA_PARTITIONS.get(1);
    }
}

Usage Patterns

Comprehensive Application Validation

Complete validation suite for a Flink application including serialization, packaging, and configuration.

@Test
void testApplicationValidation() throws IOException {
    // 1. Validate POJO serialization
    PojoTestUtils.assertSerializedAsPojo(OrderRecord.class);
    PojoTestUtils.assertSerializedAsPojo(CustomerRecord.class);
    PojoTestUtils.assertSerializedAsPojo(ProductRecord.class);
    
    // 2. Validate JAR packaging
    Path applicationJar = Paths.get("target/order-processing-app.jar");
    Collection<String> allowedPaths = Arrays.asList(
        "com/mycompany/orders/.*",
        "META-INF/.*",
        "org/apache/flink/.*"
    );
    PackagingTestUtils.assertJarContainsOnlyFilesMatching(applicationJar, allowedPaths);
    
    // 3. Validate resource availability
    Path configFile = ResourceTestUtils.getResource(".*application\\.conf");
    assertTrue(Files.exists(configFile));
    
    // 4. Validate parameter configuration
    TestConfiguration config = new TestConfiguration();
    assertNotNull(config.getKafkaBootstrapServers());
    assertTrue(config.getKafkaPartitions() > 0);
}

CI/CD Pipeline Validation

Validation utilities designed for continuous integration and deployment pipelines.

@Test
void testCiCdValidation() throws IOException {
    // Environment-specific parameter validation
    ParameterProperty<String> environmentProperty = new ParameterProperty<>(
        "deployment.environment", Function.identity());
    
    String environment = environmentProperty.get("test");
    
    // Validate based on environment
    switch (environment) {
        case "production":
            validateProductionConfiguration();
            break;
        case "staging":
            validateStagingConfiguration();
            break;
        default:
            validateTestConfiguration();
    }
    
    // JAR validation for deployment
    Path deploymentJar = ResourceTestUtils.getResource(".*-deployment\\.jar");
    validateDeploymentJarStructure(deploymentJar);
}

private void validateProductionConfiguration() throws IOException {
    // Production-specific validations
    ParameterProperty<Boolean> debugEnabled = new ParameterProperty<>(
        "flink.debug.enabled", Boolean::parseBoolean);
    
    assertFalse(debugEnabled.get(false), "Debug should be disabled in production");
}

Type System Validation

Ensuring optimal serialization performance through POJO validation.

@Test
void testSerializationPerformance() {
    // Test that critical data types are POJOs for optimal performance
    PojoTestUtils.assertSerializedAsPojo(Event.class);
    PojoTestUtils.assertSerializedAsPojo(Measurement.class);
    PojoTestUtils.assertSerializedAsPojo(Alert.class);
    
    // Ensure no Kryo fallback for performance-critical types
    PojoTestUtils.assertSerializedAsPojoWithoutKryo(HighFrequencyEvent.class);
}

// Performance-critical event class
public static class HighFrequencyEvent {
    public long timestamp;
    public String sensorId;
    public double value;
    public String status;
    
    // POJO requirements: default constructor and getters/setters
    public HighFrequencyEvent() {}
    
    // Constructor, getters, and setters...
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils

docs

index.md

metrics-testing.md

minicluster-management.md

result-verification.md

specialized-connectors.md

test-data-sources.md

test-environments.md

validation-utilities.md

tile.json