or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-testing.mdconnector-testing.mdcore-testing.mdindex.mdmigration-testing.mdtable-testing.mdtest-environments.md

table-testing.mddocs/

0

# Table API and Filesystem Testing

1

2

Specialized testing utilities for Flink Table API applications and filesystem-based connectors. These utilities enable comprehensive testing of Table API functionality, catalog operations, and filesystem-based data processing.

3

4

## Capabilities

5

6

### Filesystem Table Testing

7

8

#### TestFileSystemTableFactory

9

10

Test-specific implementation of filesystem table factory for controlled testing scenarios.

11

12

```java { .api }

13

/**

14

* Test-specific filesystem table factory

15

* Extends FileSystemTableFactory with test-friendly features

16

*/

17

class TestFileSystemTableFactory extends FileSystemTableFactory {

18

/** Create table factory with test configuration */

19

TestFileSystemTableFactory();

20

21

/** Create table factory with custom test directory */

22

TestFileSystemTableFactory(String testDirectory);

23

24

/** Get test data directory path */

25

String getTestDataDirectory();

26

27

/** Clear test data directory */

28

void clearTestData();

29

30

/** Set up test data files */

31

void setupTestData(Map<String, List<String>> tableData);

32

}

33

```

34

35

**Usage Examples:**

36

37

```java

38

import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;

39

40

// Create test filesystem table factory

41

TestFileSystemTableFactory factory = new TestFileSystemTableFactory("/tmp/test-tables");

42

43

// Set up test data

44

Map<String, List<String>> testData = new HashMap<>();

45

testData.put("users.csv", Arrays.asList(

46

"id,name,age",

47

"1,Alice,25",

48

"2,Bob,30"

49

));

50

factory.setupTestData(testData);

51

52

// Use in table environment

53

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

54

tableEnv.createTemporaryTable("users",

55

TableDescriptor.forConnector("filesystem")

56

.schema(Schema.newBuilder()

57

.column("id", DataTypes.INT())

58

.column("name", DataTypes.STRING())

59

.column("age", DataTypes.INT())

60

.build())

61

.option("path", factory.getTestDataDirectory() + "/users.csv")

62

.option("format", "csv")

63

.build());

64

```

65

66

### Test Catalog Support

67

68

#### TestFileSystemCatalog

69

70

Test implementation of filesystem-based catalog for Table API testing.

71

72

```java { .api }

73

/**

74

* Test filesystem catalog implementation

75

* Provides in-memory catalog functionality for testing

76

*/

77

class TestFileSystemCatalog extends AbstractCatalog {

78

/** Create catalog with test configuration */

79

TestFileSystemCatalog(String catalogName, String defaultDatabase);

80

81

/** Create catalog with custom properties */

82

TestFileSystemCatalog(String catalogName, String defaultDatabase,

83

Map<String, String> properties);

84

85

/** Add test table to catalog */

86

void addTestTable(ObjectPath tablePath, CatalogTable table);

87

88

/** Add test database to catalog */

89

void addTestDatabase(String databaseName, CatalogDatabase database);

90

91

/** Clear all test data */

92

void clearTestData();

93

94

/** Get table statistics for testing */

95

CatalogTableStatistics getTableStatistics(ObjectPath tablePath);

96

97

/** Set table statistics for testing */

98

void setTableStatistics(ObjectPath tablePath, CatalogTableStatistics statistics);

99

}

100

```

101

102

#### TestFileSystemCatalogFactory

103

104

Factory for creating test filesystem catalogs.

105

106

```java { .api }

107

/**

108

* Factory for test filesystem catalog

109

* Implements CatalogFactory for integration with Table API

110

*/

111

class TestFileSystemCatalogFactory implements CatalogFactory {

112

/** Get catalog identifier */

113

String factoryIdentifier();

114

115

/** Get required options */

116

Set<ConfigOption<?>> requiredOptions();

117

118

/** Get optional options */

119

Set<ConfigOption<?>> optionalOptions();

120

121

/** Create catalog instance */

122

Catalog createCatalog(Context context);

123

124

/** Create test catalog with defaults */

125

static TestFileSystemCatalog createTestCatalog(String catalogName);

126

127

/** Create test catalog with configuration */

128

static TestFileSystemCatalog createTestCatalog(String catalogName, Configuration config);

129

}

130

```

131

132

**Usage Examples:**

133

134

```java

135

import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;

136

import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalogFactory;

137

138

// Create test catalog

139

TestFileSystemCatalog catalog = TestFileSystemCatalogFactory.createTestCatalog("test_catalog");

140

141

// Add test database

142

CatalogDatabase testDb = new CatalogDatabaseImpl(

143

Map.of("description", "Test database"),

144

"Test database for unit tests"

145

);

146

catalog.addTestDatabase("test_db", testDb);

147

148

// Add test table

149

Schema schema = Schema.newBuilder()

150

.column("id", DataTypes.BIGINT())

151

.column("name", DataTypes.STRING())

152

.primaryKey("id")

153

.build();

154

155

CatalogTable table = CatalogTable.of(

156

schema,

157

"Test table for unit tests",

158

List.of(),

159

Map.of("connector", "filesystem", "path", "/tmp/test", "format", "parquet")

160

);

161

162

ObjectPath tablePath = new ObjectPath("test_db", "test_table");

163

catalog.addTestTable(tablePath, table);

164

165

// Register catalog with table environment

166

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

167

tableEnv.registerCatalog("test_catalog", catalog);

168

tableEnv.useCatalog("test_catalog");

169

170

// Query test table

171

Table result = tableEnv.sqlQuery("SELECT * FROM test_db.test_table");

172

```

173

174

### JSON Serialization Utilities

175

176

#### JsonSerdeUtil

177

178

Utilities for JSON serialization in catalog testing scenarios.

179

180

```java { .api }

181

/**

182

* JSON serialization utilities for catalog testing

183

* Provides consistent JSON handling for test data

184

*/

185

class JsonSerdeUtil {

186

/** Serialize object to JSON string */

187

static String toJson(Object object);

188

189

/** Deserialize JSON string to object */

190

static <T> T fromJson(String json, Class<T> clazz);

191

192

/** Deserialize JSON string to generic type */

193

static <T> T fromJson(String json, TypeReference<T> typeRef);

194

195

/** Serialize map to JSON string */

196

static String mapToJson(Map<String, Object> map);

197

198

/** Deserialize JSON string to map */

199

static Map<String, Object> jsonToMap(String json);

200

201

/** Pretty print JSON string */

202

static String prettyPrint(String json);

203

204

/** Validate JSON format */

205

static boolean isValidJson(String json);

206

}

207

```

208

209

**Usage Examples:**

210

211

```java

212

import org.apache.flink.table.file.testutils.catalog.JsonSerdeUtil;

213

214

// Serialize catalog metadata

215

Map<String, Object> catalogMetadata = new HashMap<>();

216

catalogMetadata.put("version", "1.0");

217

catalogMetadata.put("tables", Arrays.asList("table1", "table2"));

218

219

String json = JsonSerdeUtil.mapToJson(catalogMetadata);

220

System.out.println(JsonSerdeUtil.prettyPrint(json));

221

222

// Deserialize back to map

223

Map<String, Object> restored = JsonSerdeUtil.jsonToMap(json);

224

assertEquals(catalogMetadata, restored);

225

226

// Serialize/deserialize complex objects

227

CatalogTableStatistics stats = new CatalogTableStatistics(1000, 10, 100, 50);

228

String statsJson = JsonSerdeUtil.toJson(stats);

229

CatalogTableStatistics restoredStats = JsonSerdeUtil.fromJson(statsJson, CatalogTableStatistics.class);

230

assertEquals(stats, restoredStats);

231

```

232

233

## Table API Testing Patterns

234

235

### Basic Table Testing Pattern

236

237

```java

238

import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;

239

import org.apache.flink.table.api.TableEnvironment;

240

241

@Test

242

public void testTableProcessing() {

243

// Set up test data

244

TestFileSystemTableFactory factory = new TestFileSystemTableFactory();

245

Map<String, List<String>> testData = new HashMap<>();

246

testData.put("input.csv", Arrays.asList(

247

"id,value,timestamp",

248

"1,100,2023-01-01 10:00:00",

249

"2,200,2023-01-01 11:00:00"

250

));

251

factory.setupTestData(testData);

252

253

// Create table environment

254

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

255

256

// Register test table

257

tableEnv.createTemporaryTable("input_table",

258

TableDescriptor.forConnector("filesystem")

259

.schema(Schema.newBuilder()

260

.column("id", DataTypes.INT())

261

.column("value", DataTypes.DOUBLE())

262

.column("timestamp", DataTypes.TIMESTAMP())

263

.build())

264

.option("path", factory.getTestDataDirectory() + "/input.csv")

265

.option("format", "csv")

266

.build());

267

268

// Execute query

269

Table result = tableEnv.sqlQuery("SELECT id, value * 2 as doubled_value FROM input_table");

270

271

// Collect and verify results

272

List<Row> rows = result.execute().collect();

273

assertEquals(2, rows.size());

274

assertEquals(200.0, rows.get(0).getField("doubled_value"));

275

}

276

```

277

278

### Catalog Integration Testing

279

280

```java

281

import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;

282

283

@Test

284

public void testCatalogIntegration() {

285

// Create test catalog

286

TestFileSystemCatalog catalog = TestFileSystemCatalogFactory.createTestCatalog("test_fs_catalog");

287

288

// Set up test database and tables

289

catalog.addTestDatabase("sales", new CatalogDatabaseImpl(Map.of(), "Sales database"));

290

291

Schema orderSchema = Schema.newBuilder()

292

.column("order_id", DataTypes.BIGINT())

293

.column("customer_id", DataTypes.BIGINT())

294

.column("amount", DataTypes.DECIMAL(10, 2))

295

.column("order_date", DataTypes.DATE())

296

.primaryKey("order_id")

297

.build();

298

299

CatalogTable ordersTable = CatalogTable.of(

300

orderSchema,

301

"Orders table",

302

List.of(),

303

Map.of("connector", "filesystem", "path", "/data/orders", "format", "parquet")

304

);

305

306

catalog.addTestTable(new ObjectPath("sales", "orders"), ordersTable);

307

308

// Register with table environment

309

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

310

tableEnv.registerCatalog("test_fs_catalog", catalog);

311

tableEnv.useCatalog("test_fs_catalog");

312

313

// Test catalog operations

314

assertTrue(tableEnv.listDatabases().contains("sales"));

315

assertTrue(tableEnv.listTables("sales").contains("orders"));

316

317

// Test table access

318

Table orders = tableEnv.from("sales.orders");

319

assertNotNull(orders);

320

}

321

```

322

323

### Filesystem Connector Testing

324

325

```java

326

@Test

327

public void testFileSystemConnectorWithPartitioning() {

328

TestFileSystemTableFactory factory = new TestFileSystemTableFactory();

329

330

// Set up partitioned test data

331

Map<String, List<String>> partitionedData = new HashMap<>();

332

partitionedData.put("year=2023/month=01/data.csv", Arrays.asList(

333

"id,value",

334

"1,100",

335

"2,200"

336

));

337

partitionedData.put("year=2023/month=02/data.csv", Arrays.asList(

338

"id,value",

339

"3,300",

340

"4,400"

341

));

342

factory.setupTestData(partitionedData);

343

344

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

345

346

// Create partitioned table

347

tableEnv.createTemporaryTable("partitioned_table",

348

TableDescriptor.forConnector("filesystem")

349

.schema(Schema.newBuilder()

350

.column("id", DataTypes.INT())

351

.column("value", DataTypes.DOUBLE())

352

.column("year", DataTypes.INT())

353

.column("month", DataTypes.INT())

354

.build())

355

.partitionedBy("year", "month")

356

.option("path", factory.getTestDataDirectory())

357

.option("format", "csv")

358

.build());

359

360

// Test partition pruning

361

Table januaryData = tableEnv.sqlQuery(

362

"SELECT * FROM partitioned_table WHERE year = 2023 AND month = 1");

363

364

List<Row> rows = januaryData.execute().collect();

365

assertEquals(2, rows.size());

366

367

// Clean up

368

factory.clearTestData();

369

}

370

```

371

372

### Streaming Table Testing with Watermarks

373

374

```java

375

@Test

376

public void testStreamingTableWithWatermarks() throws Exception {

377

TestFileSystemTableFactory factory = new TestFileSystemTableFactory();

378

379

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

380

381

// Create streaming source table with watermarks

382

tableEnv.createTemporaryTable("events",

383

TableDescriptor.forConnector("filesystem")

384

.schema(Schema.newBuilder()

385

.column("event_id", DataTypes.BIGINT())

386

.column("event_time", DataTypes.TIMESTAMP(3))

387

.column("user_id", DataTypes.BIGINT())

388

.watermark("event_time", "event_time - INTERVAL '5' SECOND")

389

.build())

390

.option("path", factory.getTestDataDirectory() + "/events")

391

.option("format", "json")

392

.build());

393

394

// Create windowed aggregation

395

Table windowedStats = tableEnv.sqlQuery("""

396

SELECT

397

user_id,

398

TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,

399

COUNT(*) as event_count

400

FROM events

401

GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE)

402

""");

403

404

// Test watermark handling

405

assertNotNull(windowedStats);

406

}

407

```