CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils-parent

Comprehensive testing utilities for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

core-testing.mddocs/

Core Testing and Synchronization

Essential testing utilities including thread synchronization, test assertions, and JUnit integration. These utilities provide the foundation for reliable Flink unit tests with deterministic behavior and enhanced assertion capabilities.

Capabilities

Thread Synchronization

Core synchronization primitives for coordinating test execution across multiple threads.

OneShotLatch

A synchronization latch that can be triggered once and allows multiple threads to wait for that trigger event.

/**
 * One-time synchronization latch for test coordination
 * Fires once only, then remains triggered
 */
class OneShotLatch {
    /** Fire the latch, releasing all waiting threads */
    void trigger();
    
    /** Wait until the latch is triggered */
    void await() throws InterruptedException;
    
    /** Wait until triggered with timeout */
    void await(long timeout, TimeUnit timeUnit) throws InterruptedException;
    
    /** Wait until triggered, converting InterruptedException to RuntimeException */
    void awaitQuietly();
    
    /** Check if the latch has been triggered */
    boolean isTriggered();
    
    /** Reset the latch to untriggered state */
    void reset();
}

Usage Examples:

import org.apache.flink.core.testutils.OneShotLatch;

// Basic synchronization
OneShotLatch latch = new OneShotLatch();

// In test thread: wait for background task
Thread worker = new Thread(() -> {
    // Do some work
    latch.trigger(); // Signal completion
});
worker.start();
latch.await(); // Wait for worker to complete

// With timeout
if (!latch.await(5, TimeUnit.SECONDS)) {
    fail("Worker did not complete within timeout");
}

MultiShotLatch

A reusable synchronization latch that automatically resets after each await() call.

/**
 * Reusable synchronization latch that resets after each await
 * Useful for repeated coordination patterns
 */
class MultiShotLatch {
    /** Fire the latch for the next waiting thread */
    void trigger();
    
    /** Wait until triggered, then automatically reset */
    void await() throws InterruptedException;
    
    /** Check if the latch is currently triggered */
    boolean isTriggered();
}

Usage Examples:

import org.apache.flink.core.testutils.MultiShotLatch;

MultiShotLatch latch = new MultiShotLatch();

// Producer-consumer pattern
Thread producer = new Thread(() -> {
    for (int i = 0; i < 10; i++) {
        // Produce item
        latch.trigger(); // Signal item ready
    }
});

Thread consumer = new Thread(() -> {
    for (int i = 0; i < 10; i++) {
        latch.await(); // Wait for next item
        // Consume item
    }
});

CheckedThread

Thread wrapper that captures exceptions from the thread execution and re-throws them when joining.

/**
 * Thread that catches exceptions and re-throws them on join
 * Useful for testing error conditions in background threads
 */
abstract class CheckedThread extends Thread {
    /** Override this method instead of run() - exceptions will be captured */
    abstract void go() throws Exception;
    
    /** Join the thread and re-throw any captured exceptions */
    void sync() throws Exception;
    
    /** Join with timeout and re-throw any captured exceptions */
    void sync(long timeoutMillis) throws Exception;
}

Usage Examples:

import org.apache.flink.core.testutils.CheckedThread;

CheckedThread testThread = new CheckedThread() {
    @Override
    void go() throws Exception {
        // Test logic that might throw exceptions
        if (someCondition) {
            throw new RuntimeException("Test failure");
        }
    }
};

testThread.start();
testThread.sync(); // Will re-throw any exceptions from go()

Test Utilities

General utility methods for common testing scenarios and operations.

CommonTestUtils

Collection of static utility methods for common test operations.

/**
 * General utility methods for unit tests
 */
class CommonTestUtils {
    /** Create a serialized copy of an object for testing serialization */
    static <T> T createCopySerializable(T original) throws IOException, ClassNotFoundException;
    
    /** Create a temporary file with the specified contents */
    static String createTempFile(String contents) throws IOException;
    
    /** Block the current thread permanently (for testing interruption) */
    static void blockForeverNonInterruptibly();
    
    /** Set environment variables for testing */
    static void setEnv(Map<String, String> newenv);
    
    /** Check if exception chain contains a specific cause type */
    static boolean containsCause(Throwable throwable, Class<? extends Throwable> cause);
    
    /** Wait until a condition becomes true or timeout expires */
    static void waitUtil(Supplier<Boolean> condition, Duration timeout, String errorMsg) 
        throws Exception;
}

Usage Examples:

import org.apache.flink.core.testutils.CommonTestUtils;

// Test serialization
MyObject original = new MyObject();
MyObject copy = CommonTestUtils.createCopySerializable(original);
assertEquals(original, copy);

// Wait for condition
CommonTestUtils.waitUtil(
    () -> service.isReady(),
    Duration.ofSeconds(10),
    "Service did not become ready"
);

// Check exception chain
try {
    riskyOperation();
} catch (Exception e) {
    assertTrue(CommonTestUtils.containsCause(e, IOException.class));
}

Enhanced Assertions

Flink-specific assertion utilities that extend AssertJ with specialized testing capabilities.

FlinkAssertions

Static factory methods for creating Flink-specific assertions.

/**
 * Enhanced AssertJ assertions for Flink testing
 */
class FlinkAssertions extends Assertions {
    /** Create enhanced assertions for CompletableFuture */
    static <T> FlinkCompletableFutureAssert<T> assertThatFuture(CompletableFuture<T> actual);
    
    /** Create assertions for exception cause chains */
    static ListAssert<Throwable> assertThatChainOfCauses(Throwable root);
    
    /** Extract chain of causes from exception */
    static Stream<Throwable> chainOfCauses(Throwable throwable);
}

FlinkCompletableFutureAssert

Enhanced CompletableFuture assertions that don't rely on timeouts.

/**
 * Enhanced CompletableFuture assertions without timeout dependencies
 */
class FlinkCompletableFutureAssert<T> 
    extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {
    
    /** Assert that the future eventually succeeds */
    FlinkCompletableFutureAssert<T> eventuallySucceeds();
    
    /** Assert that the future eventually fails */
    FlinkCompletableFutureAssert<T> eventuallyFails();
    
    /** Assert that the future completes with a specific value */
    FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);
}

Usage Examples:

import org.apache.flink.core.testutils.FlinkAssertions;

// Future assertions
CompletableFuture<String> result = asyncOperation();
FlinkAssertions.assertThatFuture(result)
    .eventuallySucceeds()
    .isEqualTo("expected result");

// Exception chain assertions
try {
    complexOperation();
} catch (Exception e) {
    FlinkAssertions.assertThatChainOfCauses(e)
        .hasSize(3)
        .extracting(Throwable::getClass)
        .contains(IOException.class, RuntimeException.class);
}

JUnit Integration

Specialized JUnit rules and extensions for Flink testing scenarios.

RetryRule and Annotations

JUnit rule for automatically retrying failed tests with configurable conditions.

/**
 * JUnit rule to retry failed tests
 * Use with @RetryOnFailure or @RetryOnException annotations
 */
class RetryRule implements TestRule {
    TestRule apply(Statement base, Description description);
}

/**
 * Retry test on any failure
 */
@interface RetryOnFailure {
    /** Number of retry attempts */
    int times() default 3;
}

/**
 * Retry test on specific exception types
 */
@interface RetryOnException {
    /** Number of retry attempts */
    int times() default 3;
    
    /** Exception type to retry on */
    Class<? extends Throwable> exception();
}

Usage Examples:

import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryOnException;

public class FlinkRetryTest {
    @Rule
    public RetryRule retryRule = new RetryRule();
    
    @Test
    @RetryOnFailure(times = 5)
    public void testWithRetryOnAnyFailure() {
        // Test that might fail intermittently
        if (Math.random() < 0.5) {
            fail("Random failure");
        }
    }
    
    @Test
    @RetryOnException(times = 3, exception = IOException.class)
    public void testWithRetryOnSpecificException() throws IOException {
        // Test that might throw IOException
        if (networkUnavailable()) {
            throw new IOException("Network error");
        }
    }
}

SharedObjects

JUnit rule for sharing objects across test methods within a test class.

/**
 * Share objects across test methods in the same test class
 */
class SharedObjects extends ExternalResource {
    /** Add an object to be shared across test methods */
    <T> SharedReference<T> add(T object);
}

/**
 * Reference to a shared object
 */
interface SharedReference<T> {
    T get();
}

Usage Examples:

import org.apache.flink.testutils.junit.SharedObjects;

public class SharedObjectTest {
    @Rule
    public SharedObjects sharedObjects = new SharedObjects();
    
    private SharedReference<ExpensiveResource> resource;
    
    @Before
    public void setup() {
        if (resource == null) {
            resource = sharedObjects.add(new ExpensiveResource());
        }
    }
    
    @Test
    public void test1() {
        ExpensiveResource r = resource.get();
        // Use shared resource
    }
    
    @Test  
    public void test2() {
        ExpensiveResource r = resource.get(); // Same instance as test1
        // Use shared resource
    }
}

Executor Testing

Utilities for testing with controlled thread execution and scheduling.

ManuallyTriggeredScheduledExecutorService

Executor service that requires manual triggering for deterministic testing.

/**
 * Manually controlled executor for deterministic testing
 * Tasks are queued but not executed until manually triggered
 */
class ManuallyTriggeredScheduledExecutorService implements ScheduledExecutorService {
    /** Trigger execution of all queued tasks */
    void triggerAll();
    
    /** Trigger execution of the next queued task */
    void triggerNext();
    
    /** Get number of queued tasks */
    int getQueueSize();
    
    /** Check if any tasks are queued */
    boolean hasQueuedTasks();
}

Usage Examples:

import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;

ManuallyTriggeredScheduledExecutorService executor = 
    new ManuallyTriggeredScheduledExecutorService();

// Schedule tasks
executor.schedule(() -> System.out.println("Task 1"), 1, TimeUnit.SECONDS);
executor.schedule(() -> System.out.println("Task 2"), 2, TimeUnit.SECONDS);

// Tasks are queued but not executed yet
assertEquals(2, executor.getQueueSize());

// Manually trigger execution
executor.triggerNext(); // Executes "Task 1"
executor.triggerAll();  // Executes remaining tasks

Test Marker Annotations

Annotations for marking tests that have known compatibility issues with specific environments.

/** Mark tests that fail on Java 11 */
@interface FailsOnJava11 {}

/** Mark tests that fail on Java 17 */  
@interface FailsOnJava17 {}

/** Mark tests that fail with adaptive scheduler */
@interface FailsWithAdaptiveScheduler {}

Usage Examples:

import org.apache.flink.testutils.junit.FailsOnJava11;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;

public class CompatibilityTest {
    @Test
    @FailsOnJava11
    public void testThatFailsOnJava11() {
        // Test implementation
    }
    
    @Test
    @FailsWithAdaptiveScheduler
    public void testThatFailsWithAdaptiveScheduler() {
        // Test implementation  
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils-parent

docs

client-testing.md

connector-testing.md

core-testing.md

index.md

migration-testing.md

table-testing.md

test-environments.md

tile.json