0
# Table API and Filesystem Testing
1
2
Specialized testing utilities for Flink Table API applications and filesystem-based connectors. These utilities enable comprehensive testing of Table API functionality, catalog operations, and filesystem-based data processing.
3
4
## Capabilities
5
6
### Filesystem Table Testing
7
8
#### TestFileSystemTableFactory
9
10
Test-specific implementation of filesystem table factory for controlled testing scenarios.
11
12
```java { .api }
13
/**
14
* Test-specific filesystem table factory
15
* Extends FileSystemTableFactory with test-friendly features
16
*/
17
class TestFileSystemTableFactory extends FileSystemTableFactory {
18
/** Create table factory with test configuration */
19
TestFileSystemTableFactory();
20
21
/** Create table factory with custom test directory */
22
TestFileSystemTableFactory(String testDirectory);
23
24
/** Get test data directory path */
25
String getTestDataDirectory();
26
27
/** Clear test data directory */
28
void clearTestData();
29
30
/** Set up test data files */
31
void setupTestData(Map<String, List<String>> tableData);
32
}
33
```
34
35
**Usage Examples:**
36
37
```java
38
import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;
39
40
// Create test filesystem table factory
41
TestFileSystemTableFactory factory = new TestFileSystemTableFactory("/tmp/test-tables");
42
43
// Set up test data
44
Map<String, List<String>> testData = new HashMap<>();
45
testData.put("users.csv", Arrays.asList(
46
"id,name,age",
47
"1,Alice,25",
48
"2,Bob,30"
49
));
50
factory.setupTestData(testData);
51
52
// Use in table environment
53
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
54
tableEnv.createTemporaryTable("users",
55
TableDescriptor.forConnector("filesystem")
56
.schema(Schema.newBuilder()
57
.column("id", DataTypes.INT())
58
.column("name", DataTypes.STRING())
59
.column("age", DataTypes.INT())
60
.build())
61
.option("path", factory.getTestDataDirectory() + "/users.csv")
62
.option("format", "csv")
63
.build());
64
```
65
66
### Test Catalog Support
67
68
#### TestFileSystemCatalog
69
70
Test implementation of filesystem-based catalog for Table API testing.
71
72
```java { .api }
73
/**
74
* Test filesystem catalog implementation
75
* Provides in-memory catalog functionality for testing
76
*/
77
class TestFileSystemCatalog extends AbstractCatalog {
78
/** Create catalog with test configuration */
79
TestFileSystemCatalog(String catalogName, String defaultDatabase);
80
81
/** Create catalog with custom properties */
82
TestFileSystemCatalog(String catalogName, String defaultDatabase,
83
Map<String, String> properties);
84
85
/** Add test table to catalog */
86
void addTestTable(ObjectPath tablePath, CatalogTable table);
87
88
/** Add test database to catalog */
89
void addTestDatabase(String databaseName, CatalogDatabase database);
90
91
/** Clear all test data */
92
void clearTestData();
93
94
/** Get table statistics for testing */
95
CatalogTableStatistics getTableStatistics(ObjectPath tablePath);
96
97
/** Set table statistics for testing */
98
void setTableStatistics(ObjectPath tablePath, CatalogTableStatistics statistics);
99
}
100
```
101
102
#### TestFileSystemCatalogFactory
103
104
Factory for creating test filesystem catalogs.
105
106
```java { .api }
107
/**
108
* Factory for test filesystem catalog
109
* Implements CatalogFactory for integration with Table API
110
*/
111
class TestFileSystemCatalogFactory implements CatalogFactory {
112
/** Get catalog identifier */
113
String factoryIdentifier();
114
115
/** Get required options */
116
Set<ConfigOption<?>> requiredOptions();
117
118
/** Get optional options */
119
Set<ConfigOption<?>> optionalOptions();
120
121
/** Create catalog instance */
122
Catalog createCatalog(Context context);
123
124
/** Create test catalog with defaults */
125
static TestFileSystemCatalog createTestCatalog(String catalogName);
126
127
/** Create test catalog with configuration */
128
static TestFileSystemCatalog createTestCatalog(String catalogName, Configuration config);
129
}
130
```
131
132
**Usage Examples:**
133
134
```java
135
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;
136
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalogFactory;
137
138
// Create test catalog
139
TestFileSystemCatalog catalog = TestFileSystemCatalogFactory.createTestCatalog("test_catalog");
140
141
// Add test database
142
CatalogDatabase testDb = new CatalogDatabaseImpl(
143
Map.of("description", "Test database"),
144
"Test database for unit tests"
145
);
146
catalog.addTestDatabase("test_db", testDb);
147
148
// Add test table
149
Schema schema = Schema.newBuilder()
150
.column("id", DataTypes.BIGINT())
151
.column("name", DataTypes.STRING())
152
.primaryKey("id")
153
.build();
154
155
CatalogTable table = CatalogTable.of(
156
schema,
157
"Test table for unit tests",
158
List.of(),
159
Map.of("connector", "filesystem", "path", "/tmp/test", "format", "parquet")
160
);
161
162
ObjectPath tablePath = new ObjectPath("test_db", "test_table");
163
catalog.addTestTable(tablePath, table);
164
165
// Register catalog with table environment
166
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
167
tableEnv.registerCatalog("test_catalog", catalog);
168
tableEnv.useCatalog("test_catalog");
169
170
// Query test table
171
Table result = tableEnv.sqlQuery("SELECT * FROM test_db.test_table");
172
```
173
174
### JSON Serialization Utilities
175
176
#### JsonSerdeUtil
177
178
Utilities for JSON serialization in catalog testing scenarios.
179
180
```java { .api }
181
/**
182
* JSON serialization utilities for catalog testing
183
* Provides consistent JSON handling for test data
184
*/
185
class JsonSerdeUtil {
186
/** Serialize object to JSON string */
187
static String toJson(Object object);
188
189
/** Deserialize JSON string to object */
190
static <T> T fromJson(String json, Class<T> clazz);
191
192
/** Deserialize JSON string to generic type */
193
static <T> T fromJson(String json, TypeReference<T> typeRef);
194
195
/** Serialize map to JSON string */
196
static String mapToJson(Map<String, Object> map);
197
198
/** Deserialize JSON string to map */
199
static Map<String, Object> jsonToMap(String json);
200
201
/** Pretty print JSON string */
202
static String prettyPrint(String json);
203
204
/** Validate JSON format */
205
static boolean isValidJson(String json);
206
}
207
```
208
209
**Usage Examples:**
210
211
```java
212
import org.apache.flink.table.file.testutils.catalog.JsonSerdeUtil;
213
214
// Serialize catalog metadata
215
Map<String, Object> catalogMetadata = new HashMap<>();
216
catalogMetadata.put("version", "1.0");
217
catalogMetadata.put("tables", Arrays.asList("table1", "table2"));
218
219
String json = JsonSerdeUtil.mapToJson(catalogMetadata);
220
System.out.println(JsonSerdeUtil.prettyPrint(json));
221
222
// Deserialize back to map
223
Map<String, Object> restored = JsonSerdeUtil.jsonToMap(json);
224
assertEquals(catalogMetadata, restored);
225
226
// Serialize/deserialize complex objects
227
CatalogTableStatistics stats = new CatalogTableStatistics(1000, 10, 100, 50);
228
String statsJson = JsonSerdeUtil.toJson(stats);
229
CatalogTableStatistics restoredStats = JsonSerdeUtil.fromJson(statsJson, CatalogTableStatistics.class);
230
assertEquals(stats, restoredStats);
231
```
232
233
## Table API Testing Patterns
234
235
### Basic Table Testing Pattern
236
237
```java
238
import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;
239
import org.apache.flink.table.api.TableEnvironment;
240
241
@Test
242
public void testTableProcessing() {
243
// Set up test data
244
TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
245
Map<String, List<String>> testData = new HashMap<>();
246
testData.put("input.csv", Arrays.asList(
247
"id,value,timestamp",
248
"1,100,2023-01-01 10:00:00",
249
"2,200,2023-01-01 11:00:00"
250
));
251
factory.setupTestData(testData);
252
253
// Create table environment
254
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
255
256
// Register test table
257
tableEnv.createTemporaryTable("input_table",
258
TableDescriptor.forConnector("filesystem")
259
.schema(Schema.newBuilder()
260
.column("id", DataTypes.INT())
261
.column("value", DataTypes.DOUBLE())
262
.column("timestamp", DataTypes.TIMESTAMP())
263
.build())
264
.option("path", factory.getTestDataDirectory() + "/input.csv")
265
.option("format", "csv")
266
.build());
267
268
// Execute query
269
Table result = tableEnv.sqlQuery("SELECT id, value * 2 as doubled_value FROM input_table");
270
271
// Collect and verify results
272
List<Row> rows = result.execute().collect();
273
assertEquals(2, rows.size());
274
assertEquals(200.0, rows.get(0).getField("doubled_value"));
275
}
276
```
277
278
### Catalog Integration Testing
279
280
```java
281
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;
282
283
@Test
284
public void testCatalogIntegration() {
285
// Create test catalog
286
TestFileSystemCatalog catalog = TestFileSystemCatalogFactory.createTestCatalog("test_fs_catalog");
287
288
// Set up test database and tables
289
catalog.addTestDatabase("sales", new CatalogDatabaseImpl(Map.of(), "Sales database"));
290
291
Schema orderSchema = Schema.newBuilder()
292
.column("order_id", DataTypes.BIGINT())
293
.column("customer_id", DataTypes.BIGINT())
294
.column("amount", DataTypes.DECIMAL(10, 2))
295
.column("order_date", DataTypes.DATE())
296
.primaryKey("order_id")
297
.build();
298
299
CatalogTable ordersTable = CatalogTable.of(
300
orderSchema,
301
"Orders table",
302
List.of(),
303
Map.of("connector", "filesystem", "path", "/data/orders", "format", "parquet")
304
);
305
306
catalog.addTestTable(new ObjectPath("sales", "orders"), ordersTable);
307
308
// Register with table environment
309
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
310
tableEnv.registerCatalog("test_fs_catalog", catalog);
311
tableEnv.useCatalog("test_fs_catalog");
312
313
// Test catalog operations
314
assertTrue(tableEnv.listDatabases().contains("sales"));
315
assertTrue(tableEnv.listTables("sales").contains("orders"));
316
317
// Test table access
318
Table orders = tableEnv.from("sales.orders");
319
assertNotNull(orders);
320
}
321
```
322
323
### Filesystem Connector Testing
324
325
```java
326
@Test
327
public void testFileSystemConnectorWithPartitioning() {
328
TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
329
330
// Set up partitioned test data
331
Map<String, List<String>> partitionedData = new HashMap<>();
332
partitionedData.put("year=2023/month=01/data.csv", Arrays.asList(
333
"id,value",
334
"1,100",
335
"2,200"
336
));
337
partitionedData.put("year=2023/month=02/data.csv", Arrays.asList(
338
"id,value",
339
"3,300",
340
"4,400"
341
));
342
factory.setupTestData(partitionedData);
343
344
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
345
346
// Create partitioned table
347
tableEnv.createTemporaryTable("partitioned_table",
348
TableDescriptor.forConnector("filesystem")
349
.schema(Schema.newBuilder()
350
.column("id", DataTypes.INT())
351
.column("value", DataTypes.DOUBLE())
352
.column("year", DataTypes.INT())
353
.column("month", DataTypes.INT())
354
.build())
355
.partitionedBy("year", "month")
356
.option("path", factory.getTestDataDirectory())
357
.option("format", "csv")
358
.build());
359
360
// Test partition pruning
361
Table januaryData = tableEnv.sqlQuery(
362
"SELECT * FROM partitioned_table WHERE year = 2023 AND month = 1");
363
364
List<Row> rows = januaryData.execute().collect();
365
assertEquals(2, rows.size());
366
367
// Clean up
368
factory.clearTestData();
369
}
370
```
371
372
### Streaming Table Testing with Watermarks
373
374
```java
375
@Test
376
public void testStreamingTableWithWatermarks() throws Exception {
377
TestFileSystemTableFactory factory = new TestFileSystemTableFactory();
378
379
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
380
381
// Create streaming source table with watermarks
382
tableEnv.createTemporaryTable("events",
383
TableDescriptor.forConnector("filesystem")
384
.schema(Schema.newBuilder()
385
.column("event_id", DataTypes.BIGINT())
386
.column("event_time", DataTypes.TIMESTAMP(3))
387
.column("user_id", DataTypes.BIGINT())
388
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
389
.build())
390
.option("path", factory.getTestDataDirectory() + "/events")
391
.option("format", "json")
392
.build());
393
394
// Create windowed aggregation
395
Table windowedStats = tableEnv.sqlQuery("""
396
SELECT
397
user_id,
398
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
399
COUNT(*) as event_count
400
FROM events
401
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE)
402
""");
403
404
// Test watermark handling
405
assertNotNull(windowedStats);
406
}
407
```