or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md
tile.json

tessl/maven-org-apache-flink--flink-test-utils_2-10

Testing utilities for Apache Flink stream processing framework providing test environments, base classes and sample data

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-test-utils_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-test-utils_2-10@1.3.0

index.mddocs/

Flink Test Utils

Flink Test Utils is a comprehensive testing library for Apache Flink applications providing test environments, base classes, utilities, and sample data for unit and integration testing of both streaming and batch processing applications.

Package Information

  • Package Name: flink-test-utils_2.10
  • Package Type: Maven
  • Language: Java (with Scala 2.10 support)
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils_2.10</artifactId>
      <version>1.3.3</version>
      <scope>test</scope>
    </dependency>

Core Imports

import org.apache.flink.test.util.*;
import org.apache.flink.streaming.util.*;
import org.apache.flink.test.testdata.*;

Basic Usage

// Basic test setup
public class MyFlinkTest extends JavaProgramTestBase {
    @Override
    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // Your test logic here
        DataSet<String> input = env.fromElements("hello", "world");
        DataSet<String> result = input.map(s -> s.toUpperCase());
        
        // Collect and verify results
        List<String> resultList = result.collect();
        TestBaseUtils.compareResultAsText(resultList, "HELLO\\nWORLD");
    }
}

Architecture

The library is organized into three main areas:

  • Test Environments: Execution environments for running Flink jobs in test contexts
  • Test Base Classes: Abstract base classes providing standardized test patterns and cluster management
  • Test Utilities: Helper classes for result validation, cluster management, and security testing
  • Sample Test Data: Pre-built datasets for common algorithms and testing scenarios

Capabilities

Test Environment Management

Provides specialized execution environments for testing Flink applications in controlled environments.

// Batch job testing environment
public class TestEnvironment extends ExecutionEnvironment {
    public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled);
    public JobExecutionResult execute(String jobName) throws Exception;
    public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);
}

// Streaming job testing environment  
public class TestStreamEnvironment extends StreamExecutionEnvironment {
    public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);
    public JobExecutionResult execute(String jobName) throws Exception;
    public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);
}

Test Environments

Test Base Classes

Abstract base classes that provide standardized testing patterns, cluster lifecycle management, and parameterized testing across multiple execution modes.

// Base class for batch program tests
public abstract class JavaProgramTestBase extends AbstractTestBase {
    public JavaProgramTestBase();
    protected abstract void testProgram() throws Exception;
    public void setParallelism(int parallelism);
}

// Base class for streaming program tests
public abstract class StreamingProgramTestBase extends AbstractTestBase {
    protected abstract void testProgram() throws Exception;
    public void setParallelism(int parallelism);
}

// Base class for multiple program tests
public class MultipleProgramsTestBase extends TestBaseUtils {
    public MultipleProgramsTestBase(TestExecutionMode mode);
}

Test Base Classes

Test Utilities and Result Validation

Comprehensive utilities for cluster management, result comparison, test data handling, and specialized testing utilities.

public class TestBaseUtils extends TestLogger {
    // Cluster management
    public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots, 
                                                     boolean startWebserver, boolean startZooKeeper, 
                                                     boolean singleActorSystem) throws Exception;
    
    // Result validation and comparison
    public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
    public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
    public static <T> void compareResultAsTuples(List<T> result, String expected);
    public static <T> void compareResultAsText(List<T> result, String expected);
    public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception;
    public static void checkLinesAgainstRegexp(String resultPath, String regexp);
    
    // File I/O utilities
    public static BufferedReader[] getResultReader(String resultPath) throws IOException;
    public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
    
    // Utility methods
    public static String constructTestPath(Class<?> forClass, String folder);
    public static String getFromHTTP(String url) throws Exception;
}

// Specialized testing utilities
public abstract class CheckedThread extends Thread {
    public abstract void go() throws Exception;
    public void sync() throws Exception;
}

public class RetryRule implements TestRule {
    // JUnit rule for automatic test retries
}

public class CommonTestUtils {
    public static <T extends Serializable> T createCopySerializable(T original);
    public static void setEnv(Map<String, String> newenv);
}

Test Utilities

Security Testing

Support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC.

public class SecureTestEnvironment {
    public static void prepare(TemporaryFolder tempFolder);
    public static void cleanup();
    public static Configuration populateFlinkSecureConfigurations(Configuration flinkConf);
}

public class TestingSecurityContext {
    public static void install(SecurityConfiguration config, 
                             Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;
}

Security Testing

Sample Test Data

Pre-built datasets and validation utilities for common Flink algorithms and testing scenarios.

// PageRank test data
public class PageRankData {
    public static final String VERTICES;
    public static final String EDGES;
    public static final String RANKS_AFTER_3_ITERATIONS;
}

// K-Means clustering test data
public class KMeansData {
    public static final String DATAPOINTS;
    public static final String INITIAL_CENTERS;
    public static void checkResultsWithDelta(String expectedResults, List<String> resultLines, double maxDelta);
}

Sample Test Data

Common Types

// Test execution modes
public enum TestExecutionMode {
    CLUSTER,
    CLUSTER_OBJECT_REUSE, 
    COLLECTION
}

// Security configuration for testing
public static class ClientSecurityConfiguration {
    public String getPrincipal();
    public String getKeytab();
    public ClientSecurityConfiguration(String principal, String keytab);
}

// Test annotations for retry mechanisms
public @interface RetryOnFailure {
    int times();
}

public @interface RetryOnException {
    int times();
    Class<? extends Throwable> exception();
}

// Local Flink cluster for testing
public class LocalFlinkMiniCluster {
    // Mini cluster used by all test environments
}

// Duration utilities
public class FiniteDuration {
    // Timeout specifications for cluster operations
}