CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils-parent

Comprehensive testing utilities for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

table-testing.mddocs/

Table API and Filesystem Testing

Specialized testing utilities for Flink Table API applications and filesystem-based connectors. These utilities enable comprehensive testing of Table API functionality, catalog operations, and filesystem-based data processing.

Capabilities

Filesystem Table Testing

TestFileSystemTableFactory

Test-specific implementation of filesystem table factory for controlled testing scenarios.

/**
 * Test-specific filesystem table factory
 * Extends FileSystemTableFactory with test-friendly features
 */
class TestFileSystemTableFactory extends FileSystemTableFactory {
    /** Create table factory with test configuration */
    TestFileSystemTableFactory();
    
    /** Create table factory with custom test directory */
    TestFileSystemTableFactory(String testDirectory);
    
    /** Get test data directory path */
    String getTestDataDirectory();
    
    /** Clear test data directory */
    void clearTestData();
    
    /** Set up test data files */
    void setupTestData(Map<String, List<String>> tableData);
}

Usage Examples:

import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;

// Create test filesystem table factory
TestFileSystemTableFactory factory = new TestFileSystemTableFactory("/tmp/test-tables");

// Set up test data
Map<String, List<String>> testData = new HashMap<>();
testData.put("users.csv", Arrays.asList(
    "id,name,age",
    "1,Alice,25",
    "2,Bob,30"
));
factory.setupTestData(testData);

// Use in table environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tableEnv.createTemporaryTable("users", 
    TableDescriptor.forConnector("filesystem")
        .schema(Schema.newBuilder()
            .column("id", DataTypes.INT())
            .column("name", DataTypes.STRING())
            .column("age", DataTypes.INT())
            .build())
        .option("path", factory.getTestDataDirectory() + "/users.csv")
        .option("format", "csv")
        .build());

Test Catalog Support

TestFileSystemCatalog

Test implementation of filesystem-based catalog for Table API testing.

/**
 * Test filesystem catalog implementation
 * Provides in-memory catalog functionality for testing
 */
class TestFileSystemCatalog extends AbstractCatalog {
    /** Create catalog with test configuration */
    TestFileSystemCatalog(String catalogName, String defaultDatabase);
    
    /** Create catalog with custom properties */
    TestFileSystemCatalog(String catalogName, String defaultDatabase, 
                         Map<String, String> properties);
    
    /** Add test table to catalog */
    void addTestTable(ObjectPath tablePath, CatalogTable table);
    
    /** Add test database to catalog */
    void addTestDatabase(String databaseName, CatalogDatabase database);
    
    /** Clear all test data */
    void clearTestData();
    
    /** Get table statistics for testing */
    CatalogTableStatistics getTableStatistics(ObjectPath tablePath);
    
    /** Set table statistics for testing */
    void setTableStatistics(ObjectPath tablePath, CatalogTableStatistics statistics);
}

TestFileSystemCatalogFactory

Factory for creating test filesystem catalogs.

/**
 * Factory for test filesystem catalog
 * Implements CatalogFactory for integration with Table API
 */
class TestFileSystemCatalogFactory implements CatalogFactory {
    /** Get catalog identifier */
    String factoryIdentifier();
    
    /** Get required options */
    Set<ConfigOption<?>> requiredOptions();
    
    /** Get optional options */
    Set<ConfigOption<?>> optionalOptions();
    
    /** Create catalog instance */
    Catalog createCatalog(Context context);
    
    /** Create test catalog with defaults */
    static TestFileSystemCatalog createTestCatalog(String catalogName);
    
    /** Create test catalog with configuration */
    static TestFileSystemCatalog createTestCatalog(String catalogName, Configuration config);
}

Usage Examples:

import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalogFactory;

// Create test catalog
TestFileSystemCatalog catalog = TestFileSystemCatalogFactory.createTestCatalog("test_catalog");

// Add test database
CatalogDatabase testDb = new CatalogDatabaseImpl(
    Map.of("description", "Test database"), 
    "Test database for unit tests"
);
catalog.addTestDatabase("test_db", testDb);

// Add test table
Schema schema = Schema.newBuilder()
    .column("id", DataTypes.BIGINT())
    .column("name", DataTypes.STRING())
    .primaryKey("id")
    .build();

CatalogTable table = CatalogTable.of(
    schema,
    "Test table for unit tests",
    List.of(),
    Map.of("connector", "filesystem", "path", "/tmp/test", "format", "parquet")
);

ObjectPath tablePath = new ObjectPath("test_db", "test_table");
catalog.addTestTable(tablePath, table);

// Register catalog with table environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tableEnv.registerCatalog("test_catalog", catalog);
tableEnv.useCatalog("test_catalog");

// Query test table
Table result = tableEnv.sqlQuery("SELECT * FROM test_db.test_table");

JSON Serialization Utilities

JsonSerdeUtil

Utilities for JSON serialization in catalog testing scenarios.

/**
 * JSON serialization utilities for catalog testing
 * Provides consistent JSON handling for test data
 */
class JsonSerdeUtil {
    /** Serialize object to JSON string */
    static String toJson(Object object);
    
    /** Deserialize JSON string to object */
    static <T> T fromJson(String json, Class<T> clazz);
    
    /** Deserialize JSON string to generic type */
    static <T> T fromJson(String json, TypeReference<T> typeRef);
    
    /** Serialize map to JSON string */
    static String mapToJson(Map<String, Object> map);
    
    /** Deserialize JSON string to map */
    static Map<String, Object> jsonToMap(String json);
    
    /** Pretty print JSON string */
    static String prettyPrint(String json);
    
    /** Validate JSON format */
    static boolean isValidJson(String json);
}

Usage Examples:

import org.apache.flink.table.file.testutils.catalog.JsonSerdeUtil;

// Serialize catalog metadata
Map<String, Object> catalogMetadata = new HashMap<>();
catalogMetadata.put("version", "1.0");
catalogMetadata.put("tables", Arrays.asList("table1", "table2"));

String json = JsonSerdeUtil.mapToJson(catalogMetadata);
System.out.println(JsonSerdeUtil.prettyPrint(json));

// Deserialize back to map
Map<String, Object> restored = JsonSerdeUtil.jsonToMap(json);
assertEquals(catalogMetadata, restored);

// Serialize/deserialize complex objects
CatalogTableStatistics stats = new CatalogTableStatistics(1000, 10, 100, 50);
String statsJson = JsonSerdeUtil.toJson(stats);
CatalogTableStatistics restoredStats = JsonSerdeUtil.fromJson(statsJson, CatalogTableStatistics.class);
assertEquals(stats, restoredStats);

Table API Testing Patterns

Basic Table Testing Pattern

import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;
import org.apache.flink.table.api.TableEnvironment;

@Test
public void testTableProcessing() {
    // Set up test data
    TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
    Map<String, List<String>> testData = new HashMap<>();
    testData.put("input.csv", Arrays.asList(
        "id,value,timestamp",
        "1,100,2023-01-01 10:00:00",
        "2,200,2023-01-01 11:00:00"
    ));
    factory.setupTestData(testData);
    
    // Create table environment
    TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
    
    // Register test table
    tableEnv.createTemporaryTable("input_table",
        TableDescriptor.forConnector("filesystem")
            .schema(Schema.newBuilder()
                .column("id", DataTypes.INT())
                .column("value", DataTypes.DOUBLE())
                .column("timestamp", DataTypes.TIMESTAMP())
                .build())
            .option("path", factory.getTestDataDirectory() + "/input.csv")
            .option("format", "csv")
            .build());
    
    // Execute query
    Table result = tableEnv.sqlQuery("SELECT id, value * 2 as doubled_value FROM input_table");
    
    // Collect and verify results
    List<Row> rows = result.execute().collect();
    assertEquals(2, rows.size());
    assertEquals(200.0, rows.get(0).getField("doubled_value"));
}

Catalog Integration Testing

import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;

@Test
public void testCatalogIntegration() {
    // Create test catalog
    TestFileSystemCatalog catalog = TestFileSystemCatalogFactory.createTestCatalog("test_fs_catalog");
    
    // Set up test database and tables
    catalog.addTestDatabase("sales", new CatalogDatabaseImpl(Map.of(), "Sales database"));
    
    Schema orderSchema = Schema.newBuilder()
        .column("order_id", DataTypes.BIGINT())
        .column("customer_id", DataTypes.BIGINT())
        .column("amount", DataTypes.DECIMAL(10, 2))
        .column("order_date", DataTypes.DATE())
        .primaryKey("order_id")
        .build();
    
    CatalogTable ordersTable = CatalogTable.of(
        orderSchema,
        "Orders table",
        List.of(),
        Map.of("connector", "filesystem", "path", "/data/orders", "format", "parquet")
    );
    
    catalog.addTestTable(new ObjectPath("sales", "orders"), ordersTable);
    
    // Register with table environment
    TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
    tableEnv.registerCatalog("test_fs_catalog", catalog);
    tableEnv.useCatalog("test_fs_catalog");
    
    // Test catalog operations
    assertTrue(tableEnv.listDatabases().contains("sales"));
    assertTrue(tableEnv.listTables("sales").contains("orders"));
    
    // Test table access
    Table orders = tableEnv.from("sales.orders");
    assertNotNull(orders);
}

Filesystem Connector Testing

@Test
public void testFileSystemConnectorWithPartitioning() {
    TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
    
    // Set up partitioned test data
    Map<String, List<String>> partitionedData = new HashMap<>();
    partitionedData.put("year=2023/month=01/data.csv", Arrays.asList(
        "id,value",
        "1,100",
        "2,200"
    ));
    partitionedData.put("year=2023/month=02/data.csv", Arrays.asList(
        "id,value", 
        "3,300",
        "4,400"
    ));
    factory.setupTestData(partitionedData);
    
    TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
    
    // Create partitioned table
    tableEnv.createTemporaryTable("partitioned_table",
        TableDescriptor.forConnector("filesystem")
            .schema(Schema.newBuilder()
                .column("id", DataTypes.INT())
                .column("value", DataTypes.DOUBLE())
                .column("year", DataTypes.INT())
                .column("month", DataTypes.INT())
                .build())
            .partitionedBy("year", "month")
            .option("path", factory.getTestDataDirectory())
            .option("format", "csv")
            .build());
    
    // Test partition pruning
    Table januaryData = tableEnv.sqlQuery(
        "SELECT * FROM partitioned_table WHERE year = 2023 AND month = 1");
    
    List<Row> rows = januaryData.execute().collect();
    assertEquals(2, rows.size());
    
    // Clean up
    factory.clearTestData();
}

Streaming Table Testing with Watermarks

@Test
public void testStreamingTableWithWatermarks() throws Exception {
    TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
    
    TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
    
    // Create streaming source table with watermarks
    tableEnv.createTemporaryTable("events",
        TableDescriptor.forConnector("filesystem")
            .schema(Schema.newBuilder()
                .column("event_id", DataTypes.BIGINT())
                .column("event_time", DataTypes.TIMESTAMP(3))
                .column("user_id", DataTypes.BIGINT())
                .watermark("event_time", "event_time - INTERVAL '5' SECOND")
                .build())
            .option("path", factory.getTestDataDirectory() + "/events")
            .option("format", "json")
            .build());
    
    // Create windowed aggregation
    Table windowedStats = tableEnv.sqlQuery("""
        SELECT 
            user_id,
            TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
            COUNT(*) as event_count
        FROM events
        GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE)
        """);
    
    // Test watermark handling
    assertNotNull(windowedStats);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils-parent

docs

client-testing.md

connector-testing.md

core-testing.md

index.md

migration-testing.md

table-testing.md

test-environments.md

tile.json