or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mdcatalog-management.mdexpressions.mdindex.mdsql-integration.mdtable-environment.mdtable-operations.mduser-defined-functions.mdwindow-operations.md

catalog-management.mddocs/

0

# Catalog and Metadata Management

1

2

The catalog system manages metadata for tables, functions, and data sources in Flink. It provides a centralized registry for database objects and supports multiple catalog backends with persistent storage capabilities.

3

4

## Capabilities

5

6

### Catalog Registration and Management

7

8

Register and manage multiple catalogs within a table environment.

9

10

```java { .api }

11

/**

12

* Register a catalog instance with the specified name

13

* @param catalogName Name to register catalog under

14

* @param catalog Catalog implementation instance

15

*/

16

public void registerCatalog(String catalogName, Catalog catalog);

17

18

/**

19

* Get a registered catalog by name

20

* @param catalogName Name of the catalog to retrieve

21

* @return Optional containing the catalog if found

22

*/

23

public Optional<Catalog> getCatalog(String catalogName);

24

25

/**

26

* Set the current catalog for table resolution

27

* @param catalogName Name of catalog to set as current

28

*/

29

public void useCatalog(String catalogName);

30

31

/**

32

* Set the current database within the current catalog

33

* @param databaseName Name of database to set as current

34

*/

35

public void useDatabase(String databaseName);

36

37

/**

38

* Get the name of the current catalog

39

* @return Current catalog name

40

*/

41

public String getCurrentCatalog();

42

43

/**

44

* Get the name of the current database

45

* @return Current database name

46

*/

47

public String getCurrentDatabase();

48

```

49

50

**Usage Examples:**

51

52

```java

53

// Register custom catalog

54

Catalog hiveCatalog = new HiveCatalog("my_hive", "default", hiveConf);

55

tableEnv.registerCatalog("hive_catalog", hiveCatalog);

56

57

// Register in-memory catalog

58

Catalog memoryCatalog = new GenericInMemoryCatalog("memory_catalog");

59

tableEnv.registerCatalog("memory", memoryCatalog);

60

61

// Switch catalog context

62

tableEnv.useCatalog("hive_catalog");

63

tableEnv.useDatabase("production_db");

64

65

// Now table references resolve to hive_catalog.production_db

66

Table prodTable = tableEnv.from("orders"); // resolves to hive_catalog.production_db.orders

67

68

// Fully qualified table access

69

Table specificTable = tableEnv.from("memory.default.temp_table");

70

```

71

72

### Catalog Listing Operations

73

74

List available catalogs, databases, and tables for discovery and exploration.

75

76

```java { .api }

77

/**

78

* List all registered catalog names

79

* @return Array of catalog names

80

*/

81

public String[] listCatalogs();

82

83

/**

84

* List all databases in the current catalog

85

* @return Array of database names

86

*/

87

public String[] listDatabases();

88

89

/**

90

* List all databases in the specified catalog

91

* @param catalogName Name of catalog to list databases from

92

* @return Array of database names

93

*/

94

public String[] listDatabases(String catalogName);

95

96

/**

97

* List all tables in the current database

98

* @return Array of table names

99

*/

100

public String[] listTables();

101

102

/**

103

* List all tables in the specified database

104

* @param databaseName Database name to list tables from

105

* @return Array of table names

106

*/

107

public String[] listTables(String databaseName);

108

109

/**

110

* List all functions in the current catalog and database

111

* @return Array of function names

112

*/

113

public String[] listFunctions();

114

```

115

116

**Usage Examples:**

117

118

```java

119

// Discover available catalogs

120

String[] catalogs = tableEnv.listCatalogs();

121

System.out.println("Available catalogs: " + Arrays.toString(catalogs));

122

123

// List databases in current catalog

124

String[] databases = tableEnv.listDatabases();

125

for (String db : databases) {

126

System.out.println("Database: " + db);

127

128

// List tables in each database

129

String[] tables = tableEnv.listTables(db);

130

for (String table : tables) {

131

System.out.println(" Table: " + table);

132

}

133

}

134

135

// List functions

136

String[] functions = tableEnv.listFunctions();

137

System.out.println("Available functions: " + Arrays.toString(functions));

138

```

139

140

### Built-in Catalog Implementations

141

142

Flink provides several catalog implementations for different storage backends.

143

144

```java { .api }

145

/**

146

* In-memory catalog for testing and temporary metadata storage

147

*/

148

public class GenericInMemoryCatalog implements Catalog {

149

/**

150

* Creates an in-memory catalog with default database

151

* @param catalogName Name of the catalog

152

* @param defaultDatabase Name of the default database

153

*/

154

public GenericInMemoryCatalog(String catalogName, String defaultDatabase);

155

156

/**

157

* Creates an in-memory catalog with "default" as default database

158

* @param catalogName Name of the catalog

159

*/

160

public GenericInMemoryCatalog(String catalogName);

161

}

162

163

/**

164

* Factory for creating in-memory catalogs

165

*/

166

public class GenericInMemoryCatalogFactory implements CatalogFactory {

167

public static final String IDENTIFIER = "generic_in_memory";

168

}

169

```

170

171

**Usage Examples:**

172

173

```java

174

// Create and register in-memory catalog

175

GenericInMemoryCatalog memoryCatalog = new GenericInMemoryCatalog(

176

"test_catalog",

177

"test_db"

178

);

179

tableEnv.registerCatalog("test", memoryCatalog);

180

181

// Use catalog factory for configuration-based creation

182

Map<String, String> properties = new HashMap<>();

183

properties.put("type", "generic_in_memory");

184

properties.put("default-database", "my_database");

185

186

// Register through SQL DDL

187

tableEnv.executeSql(

188

"CREATE CATALOG test_catalog WITH (" +

189

" 'type' = 'generic_in_memory'," +

190

" 'default-database' = 'my_database'" +

191

")"

192

);

193

```

194

195

### Catalog Interface Operations

196

197

Core catalog interface for implementing custom catalog backends.

198

199

```java { .api }

200

public interface Catalog {

201

/**

202

* Open the catalog and establish connections

203

* @throws CatalogException if opening fails

204

*/

205

void open() throws CatalogException;

206

207

/**

208

* Close the catalog and clean up resources

209

* @throws CatalogException if closing fails

210

*/

211

void close() throws CatalogException;

212

213

/**

214

* Get the default database name

215

* @return Default database name

216

*/

217

String getDefaultDatabase();

218

219

/**

220

* List all database names

221

* @return List of database names

222

* @throws CatalogException if listing fails

223

*/

224

List<String> listDatabases() throws CatalogException;

225

226

/**

227

* Get database metadata

228

* @param databaseName Name of database to retrieve

229

* @return CatalogDatabase with metadata

230

* @throws CatalogException if database not found or error occurs

231

*/

232

CatalogDatabase getDatabase(String databaseName) throws CatalogException;

233

234

/**

235

* Check if database exists

236

* @param databaseName Name of database to check

237

* @return true if database exists

238

* @throws CatalogException if check fails

239

*/

240

boolean databaseExists(String databaseName) throws CatalogException;

241

242

/**

243

* Create a new database

244

* @param databaseName Name of database to create

245

* @param database Database metadata

246

* @param ignoreIfExists If true, don't throw error if database already exists

247

* @throws CatalogException if creation fails

248

*/

249

void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)

250

throws CatalogException;

251

}

252

```

253

254

### Table Management Operations

255

256

Operations for managing table metadata within catalogs.

257

258

```java { .api }

259

/**

260

* List all tables in the specified database

261

* @param databaseName Database name

262

* @return List of table names

263

* @throws CatalogException if listing fails

264

*/

265

List<String> listTables(String databaseName) throws CatalogException;

266

267

/**

268

* Get table metadata

269

* @param tablePath Object path identifying the table

270

* @return CatalogTable with complete metadata

271

* @throws CatalogException if table not found or error occurs

272

*/

273

CatalogTable getTable(ObjectPath tablePath) throws CatalogException;

274

275

/**

276

* Check if table exists

277

* @param tablePath Object path identifying the table

278

* @return true if table exists

279

* @throws CatalogException if check fails

280

*/

281

boolean tableExists(ObjectPath tablePath) throws CatalogException;

282

283

/**

284

* Create a new table

285

* @param tablePath Object path for the new table

286

* @param table Table metadata

287

* @param ignoreIfExists If true, don't throw error if table already exists

288

* @throws CatalogException if creation fails

289

*/

290

void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists)

291

throws CatalogException;

292

293

/**

294

* Drop an existing table

295

* @param tablePath Object path identifying the table to drop

296

* @param ignoreIfNotExists If true, don't throw error if table doesn't exist

297

* @throws CatalogException if drop fails

298

*/

299

void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws CatalogException;

300

```

301

302

**Usage Examples:**

303

304

```java

305

// Create table through catalog interface

306

ObjectPath tablePath = new ObjectPath("my_database", "my_table");

307

308

// Define table schema

309

Schema schema = Schema.newBuilder()

310

.column("id", DataTypes.BIGINT())

311

.column("name", DataTypes.STRING())

312

.column("created_at", DataTypes.TIMESTAMP(3))

313

.primaryKey("id")

314

.build();

315

316

// Create catalog table

317

Map<String, String> properties = new HashMap<>();

318

properties.put("connector", "filesystem");

319

properties.put("path", "/path/to/data");

320

properties.put("format", "parquet");

321

322

CatalogTable catalogTable = CatalogTable.of(

323

schema,

324

"Customer data table",

325

Collections.emptyList(),

326

properties

327

);

328

329

// Create table in catalog

330

catalog.createTable(tablePath, catalogTable, false);

331

332

// List tables

333

List<String> tables = catalog.listTables("my_database");

334

System.out.println("Tables: " + tables);

335

336

// Check if table exists

337

boolean exists = catalog.tableExists(tablePath);

338

System.out.println("Table exists: " + exists);

339

```

340

341

### Function Management

342

343

Manage user-defined functions within the catalog system.

344

345

```java { .api }

346

/**

347

* List all functions in the specified database

348

* @param databaseName Database name

349

* @return List of function names

350

* @throws CatalogException if listing fails

351

*/

352

List<String> listFunctions(String databaseName) throws CatalogException;

353

354

/**

355

* Get function metadata

356

* @param functionPath Object path identifying the function

357

* @return CatalogFunction with metadata

358

* @throws CatalogException if function not found or error occurs

359

*/

360

CatalogFunction getFunction(ObjectPath functionPath) throws CatalogException;

361

362

/**

363

* Check if function exists

364

* @param functionPath Object path identifying the function

365

* @return true if function exists

366

* @throws CatalogException if check fails

367

*/

368

boolean functionExists(ObjectPath functionPath) throws CatalogException;

369

370

/**

371

* Create a new function

372

* @param functionPath Object path for the new function

373

* @param function Function metadata

374

* @param ignoreIfExists If true, don't throw error if function already exists

375

* @throws CatalogException if creation fails

376

*/

377

void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)

378

throws CatalogException;

379

```

380

381

### Context-Resolved Objects

382

383

Objects resolved within a specific catalog context with full metadata.

384

385

```java { .api }

386

public interface ContextResolvedTable {

387

/**

388

* Get the identifier for this table

389

* @return Table identifier

390

*/

391

Identifier getIdentifier();

392

393

/**

394

* Get the resolved table

395

* @return CatalogTable with full metadata

396

*/

397

CatalogTable getTable();

398

399

/**

400

* Get the resolved schema

401

* @return ResolvedSchema for the table

402

*/

403

ResolvedSchema getResolvedSchema();

404

405

/**

406

* Check if this is a temporary table

407

* @return true if temporary

408

*/

409

boolean isTemporary();

410

}

411

412

public interface ContextResolvedFunction {

413

/**

414

* Get the function identifier

415

* @return Function identifier

416

*/

417

Identifier getIdentifier();

418

419

/**

420

* Get the catalog function metadata

421

* @return CatalogFunction with metadata

422

*/

423

CatalogFunction getCatalogFunction();

424

425

/**

426

* Get the function definition

427

* @return FunctionDefinition for execution

428

*/

429

FunctionDefinition getFunctionDefinition();

430

}

431

```

432

433

### Database and Table Metadata Types

434

435

Metadata structures for databases and tables.

436

437

```java { .api }

438

public interface CatalogDatabase {

439

/**

440

* Get database properties

441

* @return Map of property key-value pairs

442

*/

443

Map<String, String> getProperties();

444

445

/**

446

* Get database comment/description

447

* @return Database description

448

*/

449

String getComment();

450

}

451

452

public interface CatalogTable extends CatalogBaseTable {

453

/**

454

* Check if this table is partitioned

455

* @return true if partitioned

456

*/

457

boolean isPartitioned();

458

459

/**

460

* Get partition keys for partitioned tables

461

* @return List of partition key column names

462

*/

463

List<String> getPartitionKeys();

464

465

/**

466

* Create a copy of this table with new properties

467

* @param properties New properties map

468

* @return New CatalogTable with updated properties

469

*/

470

CatalogTable copy(Map<String, String> properties);

471

}

472

473

public class CatalogTableImpl implements CatalogTable {

474

/**

475

* Creates a catalog table implementation

476

* @param schema Table schema

477

* @param partitionKeys Partition key columns

478

* @param properties Table properties

479

* @param comment Table description

480

*/

481

public CatalogTableImpl(

482

Schema schema,

483

List<String> partitionKeys,

484

Map<String, String> properties,

485

String comment

486

);

487

}

488

```

489

490

**Usage Examples:**

491

492

```java

493

// Create database metadata

494

Map<String, String> dbProps = new HashMap<>();

495

dbProps.put("location", "/warehouse/analytics");

496

dbProps.put("owner", "analytics_team");

497

498

CatalogDatabase database = new CatalogDatabaseImpl(

499

dbProps,

500

"Analytics database for business intelligence"

501

);

502

503

// Create table metadata with partitioning

504

Schema tableSchema = Schema.newBuilder()

505

.column("transaction_id", DataTypes.BIGINT())

506

.column("customer_id", DataTypes.BIGINT())

507

.column("amount", DataTypes.DECIMAL(10, 2))

508

.column("transaction_date", DataTypes.DATE())

509

.column("region", DataTypes.STRING())

510

.build();

511

512

List<String> partitionKeys = Arrays.asList("transaction_date", "region");

513

514

Map<String, String> tableProps = new HashMap<>();

515

tableProps.put("connector", "filesystem");

516

tableProps.put("path", "/data/transactions");

517

tableProps.put("format", "parquet");

518

519

CatalogTable partitionedTable = new CatalogTableImpl(

520

tableSchema,

521

partitionKeys,

522

tableProps,

523

"Daily transaction data partitioned by date and region"

524

);

525

526

// Create in catalog

527

ObjectPath tablePath = new ObjectPath("analytics", "transactions");

528

catalog.createTable(tablePath, partitionedTable, false);

529

```