or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md
tile.json

test-data-providers.mddocs/

Test Data Providers

Pre-built test datasets for common algorithms and use cases, providing consistent test data for PageRank, KMeans, WordCount, and other standard Flink examples with validation utilities.

Capabilities

Word Count Data

WordCountData provides test data for word counting algorithms with expected results.

/**
 * Test data for WordCount programs
 */
public class WordCountData {
    /**
     * Goethe Faust text for word counting
     */
    public static final String TEXT;

    /**
     * Expected word counts
     */
    public static final String COUNTS;

    /**
     * Expected streaming word count tuples
     */
    public static final String STREAMING_COUNTS_AS_TUPLES;

    /**
     * Expected word count tuples
     */
    public static final String COUNTS_AS_TUPLES;
}

Usage Example:

import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

@Test
public void testWordCount() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Use test data
    DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\\n"));
    
    // Implement word count
    DataSet<Tuple2<String, Integer>> counts = text
        .flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
            for (String word : line.toLowerCase().split("\\W+")) {
                if (word.length() > 0) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        })
        .groupBy(0)
        .sum(1);
    
    List<Tuple2<String, Integer>> result = counts.collect();
    
    // Validate against expected results
    String expectedCounts = WordCountData.COUNTS;
    // Compare result with expectedCounts...
}

K-Means Data

KMeansData provides test datasets for K-means clustering algorithms with multiple dimensional variants.

/**
 * Test data for KMeans programs
 */
public class KMeansData {
    // 3D Data Constants
    /**
     * 3D data points for clustering
     */
    public static final String DATAPOINTS;

    /**
     * Initial cluster centers for 3D data
     */
    public static final String INITIAL_CENTERS;

    /**
     * Centers after one iteration for 3D data
     */
    public static final String CENTERS_AFTER_ONE_STEP;

    // Additional iteration results available...

    // 2D Data Constants
    /**
     * 2D data points for clustering
     */
    public static final String DATAPOINTS_2D;

    /**
     * Initial 2D cluster centers
     */
    public static final String INITIAL_CENTERS_2D;

    // Additional 2D iteration results available...

    /**
     * Validates K-means results with delta tolerance
     * @param expectedResult Expected result string
     * @param result Actual result list
     * @param maxDelta Maximum allowed delta for floating point comparison
     */
    public static void checkResultsWithDelta(
        String expectedResult, 
        List<String> result, 
        double maxDelta
    ) throws Exception;
}

Usage Example:

import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

@Test
public void testKMeans() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Load test data points
    DataSet<String> dataPoints = env.fromElements(KMeansData.DATAPOINTS.split("\\n"));
    DataSet<String> initialCenters = env.fromElements(KMeansData.INITIAL_CENTERS.split("\\n"));
    
    // Parse data points into Point objects
    DataSet<Point> points = dataPoints.map(line -> {
        String[] coords = line.split(" ");
        return new Point(
            Double.parseDouble(coords[0]), 
            Double.parseDouble(coords[1]), 
            Double.parseDouble(coords[2])
        );
    });
    
    DataSet<Centroid> centroids = initialCenters.map(line -> {
        String[] coords = line.split(" ");
        return new Centroid(
            Integer.parseInt(coords[0]),
            Double.parseDouble(coords[1]), 
            Double.parseDouble(coords[2]), 
            Double.parseDouble(coords[3])
        );
    });
    
    // Run one iteration of K-means
    DataSet<Centroid> newCentroids = points
        .map(new SelectNearestCenter()).withBroadcastSet(centroids, "centroids")
        .groupBy(0)
        .reduceGroup(new CentroidAccumulator());
    
    List<String> result = newCentroids.map(c -> c.toString()).collect();
    
    // Validate with expected results and delta tolerance
    KMeansData.checkResultsWithDelta(
        KMeansData.CENTERS_AFTER_ONE_STEP, 
        result, 
        0.01
    );
}

PageRank Data

PageRankData provides test data for PageRank algorithms with graph structures and expected rankings.

/**
 * Test data for PageRank programs
 */
public class PageRankData {
    /**
     * Number of vertices in test graph
     */
    public static final int NUM_VERTICES = 5;

    /**
     * Vertex data for PageRank
     */
    public static final String VERTICES;

    /**
     * Edge data for PageRank graph
     */
    public static final String EDGES;

    /**
     * Expected ranks after 3 iterations
     */
    public static final String RANKS_AFTER_3_ITERATIONS;

    /**
     * Expected ranks after convergence with epsilon 0.0001
     */
    public static final String RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
}

Usage Example:

import org.apache.flink.test.testdata.PageRankData;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

@Test
public void testPageRank() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Load vertices and edges
    DataSet<String> verticesText = env.fromElements(PageRankData.VERTICES.split("\\n"));
    DataSet<String> edgesText = env.fromElements(PageRankData.EDGES.split("\\n"));
    
    // Parse into Vertex and Edge objects
    DataSet<Tuple2<Long, Double>> vertices = verticesText.map(line -> {
        String[] parts = line.split("\\s+");
        return new Tuple2<>(Long.parseLong(parts[0]), Double.parseDouble(parts[1]));
    });
    
    DataSet<Tuple2<Long, Long>> edges = edgesText.map(line -> {
        String[] parts = line.split("\\s+");
        return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
    });
    
    // Run PageRank for 3 iterations
    DataSet<Tuple2<Long, Double>> ranks = runPageRank(vertices, edges, 3);
    
    List<String> result = ranks.map(r -> r.f0 + " " + r.f1).collect();
    
    // Validate against expected results
    String expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
    // Compare results...
}

Connected Components Data

ConnectedComponentsData provides test data for connected components algorithms.

/**
 * Test data for Connected Components programs
 */
public class ConnectedComponentsData {
    /**
     * Generates enumerated vertices
     * @param num Number of vertices to generate
     * @return String with vertex data
     */
    public static String getEnumeratingVertices(int num);

    /**
     * Generates odd/even connected edges
     * @param numVertices Number of vertices
     * @param numEdges Number of edges to generate
     * @param seed Random seed for reproducible results
     * @return String with edge data
     */
    public static String getRandomOddEvenEdges(int numVertices, int numEdges, long seed);

    /**
     * Validates odd/even connected component results
     * @param result BufferedReader with results
     */
    public static void checkOddEvenResult(BufferedReader result) throws Exception;

    /**
     * Validates tuple-based connected component results
     * @param result List of component tuples
     */
    public static void checkOddEvenResult(List<Tuple2<Long, Long>> result) throws Exception;
}

Usage Example:

import org.apache.flink.test.testdata.ConnectedComponentsData;

@Test
public void testConnectedComponents() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Generate test data
    String verticesData = ConnectedComponentsData.getEnumeratingVertices(10);
    String edgesData = ConnectedComponentsData.getRandomOddEvenEdges(10, 15, 12345L);
    
    DataSet<String> verticesText = env.fromElements(verticesData.split("\\n"));
    DataSet<String> edgesText = env.fromElements(edgesData.split("\\n"));
    
    // Parse vertices and edges
    DataSet<Tuple2<Long, Long>> vertices = verticesText.map(line -> {
        String[] parts = line.split("\\s+");
        return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[0]));
    });
    
    DataSet<Tuple2<Long, Long>> edges = edgesText.map(line -> {
        String[] parts = line.split("\\s+");
        return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
    });
    
    // Run connected components algorithm
    DataSet<Tuple2<Long, Long>> components = runConnectedComponents(vertices, edges);
    
    List<Tuple2<Long, Long>> result = components.collect();
    
    // Validate results
    ConnectedComponentsData.checkOddEvenResult(result);
}

Transitive Closure Data

TransitiveClosureData provides test data for transitive closure algorithms.

/**
 * Test data for Transitive Closure programs
 */
public class TransitiveClosureData {
    /**
     * Validates odd/even transitive closure results
     * @param result BufferedReader with results
     */
    public static void checkOddEvenResult(BufferedReader result) throws Exception;
}

Web Log Analysis Data

WebLogAnalysisData provides test data for web log analysis programs.

/**
 * Test data for Web Log Analysis programs
 */
public class WebLogAnalysisData {
    /**
     * Document data for analysis
     */
    public static final String DOCS;

    /**
     * Ranking data
     */
    public static final String RANKS;

    /**
     * Visit log data
     */
    public static final String VISITS;

    /**
     * Expected analysis results
     */
    public static final String EXCEPTED_RESULT;
}

Enum Triangle Data

EnumTriangleData provides test data for triangle enumeration algorithms.

/**
 * Test data for Enum Triangle programs
 */
public class EnumTriangleData {
    /**
     * Graph edges for triangle enumeration
     */
    public static final String EDGES;

    /**
     * Expected triangles by ID
     */
    public static final String TRIANGLES_BY_ID;

    /**
     * Expected triangles by degree
     */
    public static final String TRIANGLES_BY_DEGREE;
}

Advanced Data Provider Usage

Custom Data Validation

import org.apache.flink.test.util.TestBaseUtils;

@Test
public void testCustomDataValidation() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Use multiple data providers
    DataSet<String> wordCountInput = env.fromElements(WordCountData.TEXT.split("\\n"));
    DataSet<String> kmeansInput = env.fromElements(KMeansData.DATAPOINTS.split("\\n"));
    
    // Process data
    DataSet<String> wordCounts = wordCountInput
        .flatMap(new WordCountTokenizer())
        .groupBy(0)
        .sum(1)
        .map(tuple -> tuple.f0 + " " + tuple.f1);
    
    // Collect and validate using TestBaseUtils
    List<String> results = wordCounts.collect();
    
    // Use utility methods for validation
    TestBaseUtils.compareResultAsText(results, WordCountData.COUNTS);
}

Combining Multiple Test Datasets

@Test
public void testMultipleDatasets() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Combine different data sources
    DataSet<String> combinedData = env.fromElements(
        WordCountData.TEXT.split("\\n")
    ).union(
        env.fromElements("Additional test data", "More test content")
    );
    
    // Process combined data
    DataSet<Integer> wordLengths = combinedData
        .flatMap((String line, Collector<String> out) -> {
            for (String word : line.split("\\s+")) {
                out.collect(word);
            }
        })
        .map(word -> word.length());
    
    List<Integer> results = wordLengths.collect();
    assertFalse("Should have results", results.isEmpty());
}

Testing with Expected Results Validation

@Test
public void testWithExpectedResults() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Use PageRank data
    DataSet<String> vertices = env.fromElements(PageRankData.VERTICES.split("\\n"));
    DataSet<String> edges = env.fromElements(PageRankData.EDGES.split("\\n"));
    
    // Run algorithm and collect results
    List<String> actualResults = runPageRankAlgorithm(vertices, edges, 3);
    
    // Parse expected results for comparison
    String[] expectedLines = PageRankData.RANKS_AFTER_3_ITERATIONS.split("\\n");
    List<String> expectedResults = Arrays.asList(expectedLines);
    
    // Use TestBaseUtils for comparison with tolerance
    TestBaseUtils.compareResultCollections(
        expectedResults, 
        actualResults, 
        new PageRankComparator(0.01) // Custom comparator with delta tolerance
    );
}

Performance Testing with Large Datasets

@Test
public void testPerformanceWithLargeDataset() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);
    
    // Generate larger dataset based on KMeans data
    List<String> largeDataset = new ArrayList<>();
    String[] basePoints = KMeansData.DATAPOINTS.split("\\n");
    
    // Replicate data points for performance testing
    for (int i = 0; i < 1000; i++) {
        for (String point : basePoints) {
            largeDataset.add(point);
        }
    }
    
    DataSet<String> largePointSet = env.fromCollection(largeDataset);
    
    long startTime = System.currentTimeMillis();
    
    // Process large dataset
    List<String> results = largePointSet
        .map(new PointProcessor())
        .collect();
    
    long endTime = System.currentTimeMillis();
    long processingTime = endTime - startTime;
    
    // Validate performance and results
    assertFalse("Should have processed data", results.isEmpty());
    assertTrue("Processing should complete within reasonable time", 
        processingTime < 30000); // 30 seconds
}