Comprehensive testing utilities for Apache Flink stream processing framework
npx @tessl/cli install tessl/maven-org-apache-flink--flink-test-utils-parent@2.1.0Apache Flink Test Utils Parent is a comprehensive collection of testing utilities for Apache Flink stream processing framework applications. This multi-module Maven project provides everything needed to test Flink applications effectively, from basic unit testing with synchronization utilities to complex connector testing, migration testing, and specialized testing for different Flink components.
<!-- Core testing utilities -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<!-- JUnit integration -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<!-- Connector testing framework -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<!-- Migration testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-migration-test-utils</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<!-- Client testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients-test-utils</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>
<!-- Table filesystem testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-filesystem-test-utils</artifactId>
<version>2.1.0</version>
<scope>test</scope>
</dependency>// Core test synchronization (flink-test-utils-junit)
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.test.junit5.MiniClusterExtension;
// Test utilities (flink-test-utils)
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.util.MetricListener;
import org.apache.flink.util.MetricAssertions;
// JUnit 5 integration (flink-test-utils-junit)
import org.apache.flink.testutils.junit.extensions.TestLoggerExtension;
import org.apache.flink.testutils.junit.extensions.RetryExtension;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
// Connector testing framework (flink-connector-test-utils)
import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
// Migration testing (flink-migration-test-utils)
import org.apache.flink.test.migration.MigrationTest;
import org.apache.flink.test.migration.PublishedVersionUtils;
// Client testing (flink-clients-test-utils)
import org.apache.flink.client.testjar.TestUserClassLoaderJob;
// Table filesystem testing (flink-table-filesystem-test-utils)
import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
public class FlinkTestExample {
@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());
@Test
public void testWithSynchronization() throws InterruptedException {
OneShotLatch latch = new OneShotLatch();
// Start background task
Thread worker = new Thread(() -> {
// Do some work
latch.trigger(); // Signal completion
});
worker.start();
// Wait for completion
latch.await();
worker.join();
}
@Test
public void testStreamingPipeline() throws Exception {
// Set up test environment
TestStreamEnvironment.setAsContext(MINI_CLUSTER.getMiniCluster(), 1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create test source with finite data
FiniteTestSource<String> source = new FiniteTestSource<>("hello", "world", "test");
DataStream<String> stream = env.addSource(source);
// Create test sink to collect results
TestListResultSink<String> sink = new TestListResultSink<>();
stream.addSink(sink);
// Execute and verify results
env.execute("Test Job");
List<String> results = sink.getResult();
assertEquals(3, results.size());
assertTrue(results.contains("hello"));
}
}The Flink Test Utils Parent is organized into six specialized modules, each targeting specific testing needs:
Essential testing utilities including thread synchronization, test assertions, and JUnit integration. Provides the foundation for reliable Flink unit tests.
// Thread synchronization (flink-test-utils-junit)
class OneShotLatch {
void trigger();
void await() throws InterruptedException;
boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException;
void awaitQuietly();
boolean isTriggered();
void reset();
}
abstract class CheckedThread extends Thread {
abstract void go() throws Exception;
void sync() throws Exception;
void sync(long timeoutMillis) throws Exception;
}
// Enhanced assertions (flink-test-utils-junit)
class FlinkAssertions {
static <T> FlinkCompletableFutureAssert<T> assertThatFuture(CompletableFuture<T> actual);
static Stream<Throwable> chainOfCauses(Throwable throwable);
}
class FlinkCompletableFutureAssert<T> extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {
FlinkCompletableFutureAssert<T> eventuallySucceeds();
FlinkCompletableFutureAssert<T> eventuallyFails();
FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);
}
// Manual executor (flink-test-utils-junit)
class ManuallyTriggeredScheduledExecutorService implements ScheduledExecutorService {
void triggerAll();
void triggerScheduledTasks();
void triggerPeriodicScheduledTasks();
Collection<ScheduledTask<?>> getScheduledTasks();
int getNumQueuedRunnables();
}Core Testing and Synchronization
Test execution environments, data sources, and utilities for creating controlled testing scenarios in Flink applications.
// Test environments (flink-test-utils)
class TestStreamEnvironment extends StreamExecutionEnvironment {
TestStreamEnvironment(MiniCluster miniCluster, int parallelism);
TestStreamEnvironment(MiniCluster miniCluster, Configuration config, int parallelism,
Collection<Path> jarFiles, Collection<URL> classPaths);
static void setAsContext(MiniCluster miniCluster, int parallelism);
static void setAsContext(MiniCluster miniCluster, int parallelism,
Collection<Path> jarFiles, Collection<URL> classpaths);
static void unsetAsContext();
JobExecutionResult getLastJobExecutionResult();
}
// MiniCluster extension (flink-test-utils)
class MiniClusterExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback,
AfterEachCallback, ParameterResolver {
MiniClusterExtension(MiniClusterResourceConfiguration configuration);
ClusterClient<?> getClusterClient();
URI getRestAddress();
MiniCluster getMiniCluster();
int getNumberSlots();
}
// Test data sources (flink-test-utils)
class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
FiniteTestSource(T... elements);
FiniteTestSource(Iterable<T> elements);
FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);
void run(SourceContext<T> ctx) throws Exception;
void cancel();
void notifyCheckpointComplete(long checkpointId) throws Exception;
}
// Test sinks (flink-test-utils)
class TestListResultSink<T> extends RichSinkFunction<T> {
TestListResultSink();
void invoke(T value) throws Exception;
List<T> getResult();
List<T> getSortedResult();
}Test Environments and Data Sources
Comprehensive testing framework for Flink connectors with support for external systems, multiple test environments, and automated test suites.
// Test framework extension (flink-connector-test-utils)
@ExtendWith(ConnectorTestingExtension.class)
class ConnectorTestingExtension implements TestTemplateInvocationContextProvider {
boolean supportsTestTemplate(ExtensionContext context);
Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context);
}
// Test environment interfaces (flink-connector-test-utils)
interface TestEnvironment extends TestResource {
JobExecutionResult executeJob(JobGraph job) throws Exception;
ClusterClient<?> getClusterClient();
String getRestAddress();
String getWebUIUrl();
}
interface TestResource extends AutoCloseable {
void startUp() throws Exception;
void tearDown() throws Exception;
}
// Test environment implementations (flink-connector-test-utils)
class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable {
MiniClusterTestEnvironment();
MiniClusterTestEnvironment(MiniClusterConfiguration config);
static Builder builder();
static class Builder {
Builder setParallelism(int parallelism);
Builder setCheckpointingEnabled(boolean enabled);
Builder setCheckpointInterval(Duration interval);
MiniClusterTestEnvironment build();
}
}
// Test suite base classes (flink-connector-test-utils)
abstract class SourceTestSuiteBase<T, SplitT extends SourceSplit> {
@TestTemplate void testSourceReading(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);
@TestTemplate void testTaskManagerFailover(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);
@TestTemplate void testJobManagerFailover(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);
protected abstract ExternalContext<DataStreamSourceExternalContext<T>> sourceExternalContext();
protected abstract Source<T, SplitT, ?> source();
}
abstract class SinkTestSuiteBase<T> {
@TestTemplate void testSinkWriteWithSingleSubtask(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);
@TestTemplate void testSinkWriteWithMultipleSubtasks(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);
@TestTemplate void testScaleUpSinkWriter(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);
@TestTemplate void testScaleDownSinkWriter(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);
protected abstract ExternalContext<DataStreamSinkV2<T>> sinkExternalContext();
protected abstract List<T> generateTestData(TestingSinkSettings sinkSettings, ExternalContext<DataStreamSinkV2<T>> externalContext);
}Utilities for testing state migration between Flink versions and ensuring compatibility across version upgrades.
// Migration testing (flink-migration-test-utils)
interface MigrationTest {
static FlinkVersion getMostRecentlyPublishedVersion();
@SnapshotsGenerator
@interface SnapshotsGenerator {}
@ParameterizedSnapshotsGenerator
@interface ParameterizedSnapshotsGenerator {
String value();
}
}
// Version utilities (flink-migration-test-utils)
class PublishedVersionUtils {
static FlinkVersion getMostRecentlyPublishedVersion();
static List<FlinkVersion> getPublishedVersions();
}
class SnapshotGeneratorUtils {
static void generateSnapshots(Class<?> testClass, FlinkVersion flinkVersion, String targetDir) throws Exception;
}Migration and Compatibility Testing
Specialized testing utilities for Flink Table API applications and filesystem-based connectors.
// Filesystem table testing (flink-table-filesystem-test-utils)
class TestFileSystemTableFactory extends FileSystemTableFactory {
static final String IDENTIFIER = "test-filesystem";
String factoryIdentifier();
DynamicTableSource createDynamicTableSource(Context context);
DynamicTableSink createDynamicTableSink(Context context);
}
class TestFileSystemCatalog extends GenericInMemoryCatalog {
TestFileSystemCatalog(String catalogName, String defaultDatabaseName, String basePath);
static boolean isFileSystemTable(Map<String, String> options);
void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists) throws Exception;
void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws Exception;
}
class TestFileSystemCatalogFactory implements CatalogFactory {
static final String IDENTIFIER = "test-filesystem-catalog";
String factoryIdentifier();
Catalog createCatalog(Context context);
}Table API and Filesystem Testing
Testing utilities specifically designed for Flink client applications, including classloader testing and job submission scenarios.
// Client testing utilities (flink-clients-test-utils)
class TestUserClassLoaderJob {
static void main(String[] args) throws Exception;
static void executeWithCustomClassLoader(ClassLoader classLoader) throws Exception;
static boolean verifyClassLoading(String className, ClassLoader expectedLoader);
static JobExecutionResult getLastExecutionResult();
}
class TestUserClassLoaderAdditionalArtifact {
static void loadArtifact(String artifactPath, ClassLoader classLoader) throws Exception;
static boolean isArtifactAvailable(String artifactName, ClassLoader classLoader);
static ArtifactMetadata getArtifactMetadata(String artifactPath);
static List<String> resolveDependencies(String artifactPath);
}
class TestUserClassLoaderJobLib {
static void someLibMethod();
}// Core synchronization types (flink-test-utils-junit)
class OneShotLatch {
void trigger();
void await() throws InterruptedException;
boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException;
void awaitQuietly();
boolean isTriggered();
void reset();
}
abstract class CheckedThread extends Thread {
CheckedThread(String name);
abstract void go() throws Exception;
void sync() throws Exception;
Throwable getError();
}
// Test assertion types (flink-test-utils-junit)
class FlinkCompletableFutureAssert<T> extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {
FlinkCompletableFutureAssert<T> eventuallySucceeds();
FlinkCompletableFutureAssert<T> eventuallyFails();
FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);
}
// Test environment interfaces (flink-connector-test-utils)
interface TestEnvironment extends TestResource {
JobExecutionResult executeJob(JobGraph job) throws Exception;
ClusterClient<?> getClusterClient();
String getRestAddress();
String getWebUIUrl();
}
interface TestResource extends AutoCloseable {
void startUp() throws Exception;
void tearDown() throws Exception;
void close() throws Exception;
}
interface ExternalContext<T> extends AutoCloseable {
void setUp() throws Exception;
void tearDown() throws Exception;
Properties getConnectionProperties();
String generateTestId();
}
// Test data interfaces (flink-connector-test-utils)
interface ExternalSystemDataReader<T> extends AutoCloseable {
List<T> readData() throws Exception;
List<T> readData(Duration timeout) throws Exception;
List<T> readData(Predicate<T> filter) throws Exception;
void close() throws Exception;
}
interface ExternalSystemSplitDataWriter<T> extends AutoCloseable {
void writeSplit(List<T> data, int splitIndex) throws Exception;
void writeAndFinalize(List<List<T>> splits) throws Exception;
int getMaxParallelism();
void close() throws Exception;
}
// Test source/sink types (flink-test-utils)
class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
FiniteTestSource(T... elements);
FiniteTestSource(Iterable<T> elements);
FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);
FiniteTestSource(BooleanSupplier couldExit, long waitTimeOut, Iterable<T> elements);
void run(SourceContext<T> ctx) throws Exception;
void cancel();
void notifyCheckpointComplete(long checkpointId) throws Exception;
void notifyCheckpointAborted(long checkpointId);
}
class TestListResultSink<T> extends RichSinkFunction<T> {
TestListResultSink();
void invoke(T value) throws Exception;
List<T> getResult();
List<T> getSortedResult();
}
// Metric testing types (flink-test-utils)
class MetricListener {
MetricListener();
MetricGroup getMetricGroup();
<T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);
Optional<Meter> getMeter(String... identifier);
Optional<Counter> getCounter(String... identifier);
Optional<Histogram> getHistogram(String... identifier);
<T> Optional<Gauge<T>> getGauge(String... identifier);
}
// Migration testing types (flink-migration-test-utils)
interface MigrationTest {
static FlinkVersion getMostRecentlyPublishedVersion();
}
class FlinkVersion implements Comparable<FlinkVersion> {
int getMajor();
int getMinor();
int getPatch();
String toString();
boolean isNewerThan(FlinkVersion other);
int compareTo(FlinkVersion other);
}
// Client testing types (flink-clients-test-utils)
class ArtifactMetadata {
String getName();
String getVersion();
List<String> getDependencies();
Map<String, String> getManifestAttributes();
}