Comprehensive testing utilities for Apache Flink stream processing framework
—
Specialized testing utilities for Flink Table API applications and filesystem-based connectors. These utilities enable comprehensive testing of Table API functionality, catalog operations, and filesystem-based data processing.
Test-specific implementation of filesystem table factory for controlled testing scenarios.
/**
* Test-specific filesystem table factory
* Extends FileSystemTableFactory with test-friendly features
*/
class TestFileSystemTableFactory extends FileSystemTableFactory {
/** Create table factory with test configuration */
TestFileSystemTableFactory();
/** Create table factory with custom test directory */
TestFileSystemTableFactory(String testDirectory);
/** Get test data directory path */
String getTestDataDirectory();
/** Clear test data directory */
void clearTestData();
/** Set up test data files */
void setupTestData(Map<String, List<String>> tableData);
}Usage Examples:
import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;
// Create test filesystem table factory
TestFileSystemTableFactory factory = new TestFileSystemTableFactory("/tmp/test-tables");
// Set up test data
Map<String, List<String>> testData = new HashMap<>();
testData.put("users.csv", Arrays.asList(
"id,name,age",
"1,Alice,25",
"2,Bob,30"
));
factory.setupTestData(testData);
// Use in table environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tableEnv.createTemporaryTable("users",
TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build())
.option("path", factory.getTestDataDirectory() + "/users.csv")
.option("format", "csv")
.build());Test implementation of filesystem-based catalog for Table API testing.
/**
* Test filesystem catalog implementation
* Provides in-memory catalog functionality for testing
*/
class TestFileSystemCatalog extends AbstractCatalog {
/** Create catalog with test configuration */
TestFileSystemCatalog(String catalogName, String defaultDatabase);
/** Create catalog with custom properties */
TestFileSystemCatalog(String catalogName, String defaultDatabase,
Map<String, String> properties);
/** Add test table to catalog */
void addTestTable(ObjectPath tablePath, CatalogTable table);
/** Add test database to catalog */
void addTestDatabase(String databaseName, CatalogDatabase database);
/** Clear all test data */
void clearTestData();
/** Get table statistics for testing */
CatalogTableStatistics getTableStatistics(ObjectPath tablePath);
/** Set table statistics for testing */
void setTableStatistics(ObjectPath tablePath, CatalogTableStatistics statistics);
}Factory for creating test filesystem catalogs.
/**
* Factory for test filesystem catalog
* Implements CatalogFactory for integration with Table API
*/
class TestFileSystemCatalogFactory implements CatalogFactory {
/** Get catalog identifier */
String factoryIdentifier();
/** Get required options */
Set<ConfigOption<?>> requiredOptions();
/** Get optional options */
Set<ConfigOption<?>> optionalOptions();
/** Create catalog instance */
Catalog createCatalog(Context context);
/** Create test catalog with defaults */
static TestFileSystemCatalog createTestCatalog(String catalogName);
/** Create test catalog with configuration */
static TestFileSystemCatalog createTestCatalog(String catalogName, Configuration config);
}Usage Examples:
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalogFactory;
// Create test catalog
TestFileSystemCatalog catalog = TestFileSystemCatalogFactory.createTestCatalog("test_catalog");
// Add test database
CatalogDatabase testDb = new CatalogDatabaseImpl(
Map.of("description", "Test database"),
"Test database for unit tests"
);
catalog.addTestDatabase("test_db", testDb);
// Add test table
Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.primaryKey("id")
.build();
CatalogTable table = CatalogTable.of(
schema,
"Test table for unit tests",
List.of(),
Map.of("connector", "filesystem", "path", "/tmp/test", "format", "parquet")
);
ObjectPath tablePath = new ObjectPath("test_db", "test_table");
catalog.addTestTable(tablePath, table);
// Register catalog with table environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tableEnv.registerCatalog("test_catalog", catalog);
tableEnv.useCatalog("test_catalog");
// Query test table
Table result = tableEnv.sqlQuery("SELECT * FROM test_db.test_table");Utilities for JSON serialization in catalog testing scenarios.
/**
* JSON serialization utilities for catalog testing
* Provides consistent JSON handling for test data
*/
class JsonSerdeUtil {
/** Serialize object to JSON string */
static String toJson(Object object);
/** Deserialize JSON string to object */
static <T> T fromJson(String json, Class<T> clazz);
/** Deserialize JSON string to generic type */
static <T> T fromJson(String json, TypeReference<T> typeRef);
/** Serialize map to JSON string */
static String mapToJson(Map<String, Object> map);
/** Deserialize JSON string to map */
static Map<String, Object> jsonToMap(String json);
/** Pretty print JSON string */
static String prettyPrint(String json);
/** Validate JSON format */
static boolean isValidJson(String json);
}Usage Examples:
import org.apache.flink.table.file.testutils.catalog.JsonSerdeUtil;
// Serialize catalog metadata
Map<String, Object> catalogMetadata = new HashMap<>();
catalogMetadata.put("version", "1.0");
catalogMetadata.put("tables", Arrays.asList("table1", "table2"));
String json = JsonSerdeUtil.mapToJson(catalogMetadata);
System.out.println(JsonSerdeUtil.prettyPrint(json));
// Deserialize back to map
Map<String, Object> restored = JsonSerdeUtil.jsonToMap(json);
assertEquals(catalogMetadata, restored);
// Serialize/deserialize complex objects
CatalogTableStatistics stats = new CatalogTableStatistics(1000, 10, 100, 50);
String statsJson = JsonSerdeUtil.toJson(stats);
CatalogTableStatistics restoredStats = JsonSerdeUtil.fromJson(statsJson, CatalogTableStatistics.class);
assertEquals(stats, restoredStats);import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;
import org.apache.flink.table.api.TableEnvironment;
@Test
public void testTableProcessing() {
// Set up test data
TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
Map<String, List<String>> testData = new HashMap<>();
testData.put("input.csv", Arrays.asList(
"id,value,timestamp",
"1,100,2023-01-01 10:00:00",
"2,200,2023-01-01 11:00:00"
));
factory.setupTestData(testData);
// Create table environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
// Register test table
tableEnv.createTemporaryTable("input_table",
TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("value", DataTypes.DOUBLE())
.column("timestamp", DataTypes.TIMESTAMP())
.build())
.option("path", factory.getTestDataDirectory() + "/input.csv")
.option("format", "csv")
.build());
// Execute query
Table result = tableEnv.sqlQuery("SELECT id, value * 2 as doubled_value FROM input_table");
// Collect and verify results
List<Row> rows = result.execute().collect();
assertEquals(2, rows.size());
assertEquals(200.0, rows.get(0).getField("doubled_value"));
}import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;
@Test
public void testCatalogIntegration() {
// Create test catalog
TestFileSystemCatalog catalog = TestFileSystemCatalogFactory.createTestCatalog("test_fs_catalog");
// Set up test database and tables
catalog.addTestDatabase("sales", new CatalogDatabaseImpl(Map.of(), "Sales database"));
Schema orderSchema = Schema.newBuilder()
.column("order_id", DataTypes.BIGINT())
.column("customer_id", DataTypes.BIGINT())
.column("amount", DataTypes.DECIMAL(10, 2))
.column("order_date", DataTypes.DATE())
.primaryKey("order_id")
.build();
CatalogTable ordersTable = CatalogTable.of(
orderSchema,
"Orders table",
List.of(),
Map.of("connector", "filesystem", "path", "/data/orders", "format", "parquet")
);
catalog.addTestTable(new ObjectPath("sales", "orders"), ordersTable);
// Register with table environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tableEnv.registerCatalog("test_fs_catalog", catalog);
tableEnv.useCatalog("test_fs_catalog");
// Test catalog operations
assertTrue(tableEnv.listDatabases().contains("sales"));
assertTrue(tableEnv.listTables("sales").contains("orders"));
// Test table access
Table orders = tableEnv.from("sales.orders");
assertNotNull(orders);
}@Test
public void testFileSystemConnectorWithPartitioning() {
TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
// Set up partitioned test data
Map<String, List<String>> partitionedData = new HashMap<>();
partitionedData.put("year=2023/month=01/data.csv", Arrays.asList(
"id,value",
"1,100",
"2,200"
));
partitionedData.put("year=2023/month=02/data.csv", Arrays.asList(
"id,value",
"3,300",
"4,400"
));
factory.setupTestData(partitionedData);
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
// Create partitioned table
tableEnv.createTemporaryTable("partitioned_table",
TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("value", DataTypes.DOUBLE())
.column("year", DataTypes.INT())
.column("month", DataTypes.INT())
.build())
.partitionedBy("year", "month")
.option("path", factory.getTestDataDirectory())
.option("format", "csv")
.build());
// Test partition pruning
Table januaryData = tableEnv.sqlQuery(
"SELECT * FROM partitioned_table WHERE year = 2023 AND month = 1");
List<Row> rows = januaryData.execute().collect();
assertEquals(2, rows.size());
// Clean up
factory.clearTestData();
}@Test
public void testStreamingTableWithWatermarks() throws Exception {
TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
// Create streaming source table with watermarks
tableEnv.createTemporaryTable("events",
TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("event_id", DataTypes.BIGINT())
.column("event_time", DataTypes.TIMESTAMP(3))
.column("user_id", DataTypes.BIGINT())
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
.build())
.option("path", factory.getTestDataDirectory() + "/events")
.option("format", "json")
.build());
// Create windowed aggregation
Table windowedStats = tableEnv.sqlQuery("""
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
COUNT(*) as event_count
FROM events
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE)
""");
// Test watermark handling
assertNotNull(windowedStats);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils-parent