CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils-2-10

Testing utilities for Apache Flink stream processing framework providing test environments, base classes and sample data

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Flink Test Utils

Flink Test Utils is a comprehensive testing library for Apache Flink applications providing test environments, base classes, utilities, and sample data for unit and integration testing of both streaming and batch processing applications.

Package Information

  • Package Name: flink-test-utils_2.10
  • Package Type: Maven
  • Language: Java (with Scala 2.10 support)
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils_2.10</artifactId>
      <version>1.3.3</version>
      <scope>test</scope>
    </dependency>

Core Imports

import org.apache.flink.test.util.*;
import org.apache.flink.streaming.util.*;
import org.apache.flink.test.testdata.*;

Basic Usage

// Basic test setup
public class MyFlinkTest extends JavaProgramTestBase {
    @Override
    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // Your test logic here
        DataSet<String> input = env.fromElements("hello", "world");
        DataSet<String> result = input.map(s -> s.toUpperCase());
        
        // Collect and verify results
        List<String> resultList = result.collect();
        TestBaseUtils.compareResultAsText(resultList, "HELLO\\nWORLD");
    }
}

Architecture

The library is organized into three main areas:

  • Test Environments: Execution environments for running Flink jobs in test contexts
  • Test Base Classes: Abstract base classes providing standardized test patterns and cluster management
  • Test Utilities: Helper classes for result validation, cluster management, and security testing
  • Sample Test Data: Pre-built datasets for common algorithms and testing scenarios

Capabilities

Test Environment Management

Provides specialized execution environments for testing Flink applications in controlled environments.

// Batch job testing environment
public class TestEnvironment extends ExecutionEnvironment {
    public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled);
    public JobExecutionResult execute(String jobName) throws Exception;
    public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);
}

// Streaming job testing environment  
public class TestStreamEnvironment extends StreamExecutionEnvironment {
    public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);
    public JobExecutionResult execute(String jobName) throws Exception;
    public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);
}

Test Environments

Test Base Classes

Abstract base classes that provide standardized testing patterns, cluster lifecycle management, and parameterized testing across multiple execution modes.

// Base class for batch program tests
public abstract class JavaProgramTestBase extends AbstractTestBase {
    public JavaProgramTestBase();
    protected abstract void testProgram() throws Exception;
    public void setParallelism(int parallelism);
}

// Base class for streaming program tests
public abstract class StreamingProgramTestBase extends AbstractTestBase {
    protected abstract void testProgram() throws Exception;
    public void setParallelism(int parallelism);
}

// Base class for multiple program tests
public class MultipleProgramsTestBase extends TestBaseUtils {
    public MultipleProgramsTestBase(TestExecutionMode mode);
}

Test Base Classes

Test Utilities and Result Validation

Comprehensive utilities for cluster management, result comparison, test data handling, and specialized testing utilities.

public class TestBaseUtils extends TestLogger {
    // Cluster management
    public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots, 
                                                     boolean startWebserver, boolean startZooKeeper, 
                                                     boolean singleActorSystem) throws Exception;
    
    // Result validation and comparison
    public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
    public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
    public static <T> void compareResultAsTuples(List<T> result, String expected);
    public static <T> void compareResultAsText(List<T> result, String expected);
    public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception;
    public static void checkLinesAgainstRegexp(String resultPath, String regexp);
    
    // File I/O utilities
    public static BufferedReader[] getResultReader(String resultPath) throws IOException;
    public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
    
    // Utility methods
    public static String constructTestPath(Class<?> forClass, String folder);
    public static String getFromHTTP(String url) throws Exception;
}

// Specialized testing utilities
public abstract class CheckedThread extends Thread {
    public abstract void go() throws Exception;
    public void sync() throws Exception;
}

public class RetryRule implements TestRule {
    // JUnit rule for automatic test retries
}

public class CommonTestUtils {
    public static <T extends Serializable> T createCopySerializable(T original);
    public static void setEnv(Map<String, String> newenv);
}

Test Utilities

Security Testing

Support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC.

public class SecureTestEnvironment {
    public static void prepare(TemporaryFolder tempFolder);
    public static void cleanup();
    public static Configuration populateFlinkSecureConfigurations(Configuration flinkConf);
}

public class TestingSecurityContext {
    public static void install(SecurityConfiguration config, 
                             Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;
}

Security Testing

Sample Test Data

Pre-built datasets and validation utilities for common Flink algorithms and testing scenarios.

// PageRank test data
public class PageRankData {
    public static final String VERTICES;
    public static final String EDGES;
    public static final String RANKS_AFTER_3_ITERATIONS;
}

// K-Means clustering test data
public class KMeansData {
    public static final String DATAPOINTS;
    public static final String INITIAL_CENTERS;
    public static void checkResultsWithDelta(String expectedResults, List<String> resultLines, double maxDelta);
}

Sample Test Data

Common Types

// Test execution modes
public enum TestExecutionMode {
    CLUSTER,
    CLUSTER_OBJECT_REUSE, 
    COLLECTION
}

// Security configuration for testing
public static class ClientSecurityConfiguration {
    public String getPrincipal();
    public String getKeytab();
    public ClientSecurityConfiguration(String principal, String keytab);
}

// Test annotations for retry mechanisms
public @interface RetryOnFailure {
    int times();
}

public @interface RetryOnException {
    int times();
    Class<? extends Throwable> exception();
}

// Local Flink cluster for testing
public class LocalFlinkMiniCluster {
    // Mini cluster used by all test environments
}

// Duration utilities
public class FiniteDuration {
    // Timeout specifications for cluster operations
}
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-test-utils_2.10@1.3.x
Publish Source
CLI
Badge
tessl/maven-org-apache-flink--flink-test-utils-2-10 badge