CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils

Comprehensive testing utilities for Apache Flink stream and batch processing applications

Pending
Overview
Eval results
Files

test-data-sources.mddocs/

Test Data Sources

Finite test sources for controlled data emission with checkpoint coordination and sample datasets for common algorithms like PageRank, K-means, and Connected Components. These utilities provide deterministic data sources for reliable testing scenarios.

Capabilities

Finite Test Source

Source function that emits a finite set of elements with coordinated checkpointing and configurable exit conditions, designed for deterministic testing scenarios.

public class FiniteTestSource<T> implements SourceFunction<T> {
    public FiniteTestSource(T... elements);
    public FiniteTestSource(Iterable<T> elements);
    public FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);
    public FiniteTestSource(BooleanSupplier couldExit, long waitTimeOut, Iterable<T> elements);
}

Basic Usage

import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Create source with varargs
FiniteTestSource<Integer> numberSource = new FiniteTestSource<>(1, 2, 3, 4, 5);

// Create source with collection
List<String> words = Arrays.asList("hello", "world", "flink");
FiniteTestSource<String> wordSource = new FiniteTestSource<>(words);

// Use in streaming job
StreamExecutionEnvironment env = getTestEnvironment();
env.addSource(numberSource)
   .map(x -> x * 2)
   .print();

Advanced Usage with Exit Conditions

import java.util.function.BooleanSupplier;

// Create source with custom exit condition
BooleanSupplier exitCondition = () -> System.currentTimeMillis() > startTime + 5000;
List<String> data = Arrays.asList("a", "b", "c", "d", "e");

FiniteTestSource<String> conditionalSource = 
    new FiniteTestSource<>(exitCondition, data);

// Create source with timeout
FiniteTestSource<String> timedSource = 
    new FiniteTestSource<>(exitCondition, 10000L, data);

Test Result Collection

Utilities for collecting and managing test results from streaming jobs with thread-safe collection mechanisms.

public class TestListResultSink<T> extends RichSinkFunction<T> {
    // Sink that collects results in a thread-safe list
}

public class TestListWrapper<T> {
    // Wrapper for managing test result collections
}

Usage Example

import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

// Create result sink
TestListResultSink<String> resultSink = new TestListResultSink<>();

// Use in streaming job
env.fromElements("a", "b", "c")
   .map(String::toUpperCase)
   .addSink(resultSink);

env.execute("Test Job");

// Retrieve results
List<String> results = resultSink.getResult();
assertEquals(Arrays.asList("A", "B", "C"), results);

Sample Algorithm Data

Predefined datasets for testing common graph and machine learning algorithms, providing both input data and expected results for validation.

Word Count Data

Sample text data and expected word count results for testing text processing algorithms.

public class WordCountData {
    public static final String TEXT;
    public static final String EXPECTED_RESULT;
}

Usage Example

import org.apache.flink.test.testdata.WordCountData;

// Use predefined text data
env.fromElements(WordCountData.TEXT.split("\\s+"))
   .flatMap(new WordCountMapper())
   .keyBy(0)
   .sum(1)
   .print();

// Verify against expected results
String actualResult = collectJobOutput();
TestBaseUtils.compareResultsByLinesInMemory(
    WordCountData.EXPECTED_RESULT, actualResult);

Connected Components Data

Graph data for testing connected components algorithms with vertices, edges, and expected component assignments.

public class ConnectedComponentsData {
    public static final String VERTEX_DATA;
    public static final String EDGE_DATA; 
    public static final String EXPECTED_RESULT;
}

Usage Example

import org.apache.flink.test.testdata.ConnectedComponentsData;

// Create graph from test data
DataSet<Vertex> vertices = env.fromCollection(parseVertices(ConnectedComponentsData.VERTEX_DATA));
DataSet<Edge> edges = env.fromCollection(parseEdges(ConnectedComponentsData.EDGE_DATA));

// Run connected components algorithm
DataSet<Vertex> result = vertices.runOperation(new ConnectedComponents<>(maxIterations))
    .withBroadcastSet(edges, "edges");

// Verify results
compareWithExpected(result, ConnectedComponentsData.EXPECTED_RESULT);

K-Means Clustering Data

Sample points and expected cluster assignments for testing K-means clustering implementations.

public class KMeansData {
    public static final String DATAPOINTS;
    public static final String INITIAL_CENTROIDS;
    public static final String EXPECTED_RESULT;
}

PageRank Data

Web graph data with vertices, edges, and expected PageRank scores for testing PageRank algorithm implementations.

public class PageRankData {
    public static final String VERTICES;
    public static final String EDGES;
    public static final String EXPECTED_RESULT;
}

Triangle Enumeration Data

Graph data for testing triangle enumeration algorithms in social network analysis.

public class EnumTriangleData {
    public static final String EDGES;
    public static final String EXPECTED_RESULT;
}

Transitive Closure Data

Graph data for testing transitive closure algorithms with expected reachability results.

public class TransitiveClosureData {
    public static final String EDGES;
    public static final String EXPECTED_RESULT;
}

Web Log Analysis Data

Sample web server log data and expected analysis results for testing log processing applications.

public class WebLogAnalysisData {
    public static final String LOG_DATA;
    public static final String EXPECTED_RESULT;
}

Usage Patterns

Deterministic Testing

Using finite sources for predictable test execution with guaranteed completion.

@Test
void testStreamingTransformation() throws Exception {
    // Create deterministic source
    List<Integer> inputData = Arrays.asList(1, 2, 3, 4, 5);
    FiniteTestSource<Integer> source = new FiniteTestSource<>(inputData);
    
    // Collect results
    TestListResultSink<Integer> sink = new TestListResultSink<>();
    
    // Build and execute job
    env.addSource(source)
       .map(x -> x * x)
       .addSink(sink);
       
    env.execute("Square Numbers");
    
    // Verify results
    List<Integer> expected = Arrays.asList(1, 4, 9, 16, 25);
    assertEquals(expected, sink.getResult());
}

Algorithm Validation

Using predefined datasets to validate algorithm implementations.

@Test
void testPageRankAlgorithm() throws Exception {
    // Use predefined PageRank test data
    DataSet<Vertex> vertices = parseVertices(PageRankData.VERTICES);
    DataSet<Edge> edges = parseEdges(PageRankData.EDGES);
    
    // Run PageRank algorithm
    DataSet<Vertex> result = runPageRank(vertices, edges, 10);
    
    // Verify against expected results
    List<String> actualResults = result.collect();
    TestBaseUtils.compareResultAsText(actualResults, PageRankData.EXPECTED_RESULT);
}

Timeout-Based Testing

Using sources with timeout conditions for testing time-sensitive scenarios.

@Test
void testWithTimeout() throws Exception {
    long timeoutMs = 5000L;
    BooleanSupplier timeoutCondition = () -> 
        System.currentTimeMillis() > startTime + timeoutMs;
    
    List<String> data = generateLargeDataset();
    FiniteTestSource<String> source = 
        new FiniteTestSource<>(timeoutCondition, timeoutMs, data);
    
    // Job should complete within timeout
    env.addSource(source)
       .map(String::toUpperCase)
       .print();
       
    JobExecutionResult result = env.execute("Timeout Test");
    assertTrue(result.getNetRuntime() < timeoutMs);
}

Checkpoint Integration

Using finite sources with checkpoint coordination for testing fault tolerance.

@Test
void testCheckpointingWithFiniteSource() throws Exception {
    // Enable checkpointing
    env.enableCheckpointing(1000);
    
    // Create source that coordinates with checkpoints
    FiniteTestSource<Long> source = new FiniteTestSource<>(1L, 2L, 3L, 4L, 5L);
    
    env.addSource(source)
       .keyBy(x -> x % 2)
       .map(new StatefulMapper())
       .print();
       
    env.execute("Checkpoint Test");
    
    // Verify checkpoints were created
    // Implementation depends on your checkpoint verification strategy
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils

docs

index.md

metrics-testing.md

minicluster-management.md

result-verification.md

specialized-connectors.md

test-data-sources.md

test-environments.md

validation-utilities.md

tile.json