or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-system.mddatastream-integration.mdindex.mdsql-execution.mdtable-environment.mdtable-operations.mdtype-system.mduser-defined-functions.md

catalog-system.mddocs/

0

# Catalog System

1

2

Flink's catalog system provides pluggable metadata management for tables, functions, databases, and user-defined catalogs. It supports persistent storage, schema evolution, and integration with external metadata systems like Hive Metastore.

3

4

## Capabilities

5

6

### Catalog Management

7

8

Manage multiple catalogs and switch between different metadata repositories.

9

10

```java { .api }

11

/**

12

* Registers a catalog under a unique name

13

* @param catalogName Name for the catalog

14

* @param catalog Catalog implementation to register

15

*/

16

void registerCatalog(String catalogName, Catalog catalog);

17

18

/**

19

* Gets a registered catalog by name

20

* @param catalogName Name of the catalog to retrieve

21

* @return Optional containing the catalog if found

22

*/

23

Optional<Catalog> getCatalog(String catalogName);

24

25

/**

26

* Sets the current catalog for table operations

27

* @param catalogName Name of the catalog to use as current

28

*/

29

void useCatalog(String catalogName);

30

31

/**

32

* Gets the name of the current catalog

33

* @return Current catalog name

34

*/

35

String getCurrentCatalog();

36

37

/**

38

* Lists all registered catalog names

39

* @return Array of catalog names

40

*/

41

String[] listCatalogs();

42

```

43

44

**Usage Examples:**

45

46

```java

47

// Register different catalog types

48

HiveCatalog hiveCatalog = new HiveCatalog(

49

"my_hive",

50

"default",

51

"path/to/hive-conf",

52

"2.3.4"

53

);

54

tableEnv.registerCatalog("hive_catalog", hiveCatalog);

55

56

JdbcCatalog jdbcCatalog = new JdbcCatalog(

57

"my_jdbc_catalog",

58

"default",

59

"postgres",

60

"jdbc:postgresql://localhost:5432/metadata",

61

"username",

62

"password"

63

);

64

tableEnv.registerCatalog("postgres_catalog", jdbcCatalog);

65

66

// Switch between catalogs

67

tableEnv.useCatalog("hive_catalog");

68

String[] hiveTables = tableEnv.listTables();

69

70

tableEnv.useCatalog("postgres_catalog");

71

String[] postgresTables = tableEnv.listTables();

72

73

// Get current context

74

String currentCatalog = tableEnv.getCurrentCatalog();

75

```

76

77

### Database Operations

78

79

Manage databases within catalogs with full CRUD operations.

80

81

```java { .api }

82

interface Catalog {

83

/**

84

* Creates a database in the catalog

85

* @param name Database name

86

* @param database Database metadata

87

* @param ignoreIfExists Skip creation if database already exists

88

*/

89

void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)

90

throws DatabaseAlreadyExistException, CatalogException;

91

92

/**

93

* Drops a database from the catalog

94

* @param name Database name to drop

95

* @param ignoreIfNotExists Skip error if database doesn't exist

96

* @param cascade Drop all tables in the database

97

*/

98

void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)

99

throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;

100

101

/**

102

* Lists all databases in the catalog

103

* @return List of database names

104

*/

105

List<String> listDatabases() throws CatalogException;

106

107

/**

108

* Gets database metadata

109

* @param databaseName Name of the database

110

* @return CatalogDatabase containing metadata

111

*/

112

CatalogDatabase getDatabase(String databaseName)

113

throws DatabaseNotExistException, CatalogException;

114

115

/**

116

* Checks if a database exists

117

* @param databaseName Name of the database to check

118

* @return true if database exists

119

*/

120

boolean databaseExists(String databaseName) throws CatalogException;

121

}

122

```

123

124

**Usage Examples:**

125

126

```java

127

// Create database with properties

128

Map<String, String> dbProperties = new HashMap<>();

129

dbProperties.put("owner", "analytics_team");

130

dbProperties.put("created_date", "2024-01-01");

131

132

CatalogDatabase analyticsDb = new CatalogDatabaseImpl(

133

dbProperties,

134

"Database for analytics workflows"

135

);

136

137

Catalog catalog = tableEnv.getCatalog("hive_catalog").get();

138

catalog.createDatabase("analytics", analyticsDb, false);

139

140

// Use the new database

141

tableEnv.useDatabase("analytics");

142

143

// List databases

144

List<String> databases = catalog.listDatabases();

145

for (String db : databases) {

146

System.out.println("Database: " + db);

147

}

148

```

149

150

### Table Operations

151

152

Comprehensive table management with metadata, partitioning, and constraints.

153

154

```java { .api }

155

interface Catalog {

156

/**

157

* Creates a table in the catalog

158

* @param tablePath Path to the table (database.table)

159

* @param table Table metadata and schema

160

* @param ignoreIfExists Skip creation if table already exists

161

*/

162

void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists)

163

throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;

164

165

/**

166

* Drops a table from the catalog

167

* @param tablePath Path to the table to drop

168

* @param ignoreIfNotExists Skip error if table doesn't exist

169

*/

170

void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)

171

throws TableNotExistException, CatalogException;

172

173

/**

174

* Lists all tables in a database

175

* @param databaseName Database name

176

* @return List of table names

177

*/

178

List<String> listTables(String databaseName)

179

throws DatabaseNotExistException, CatalogException;

180

181

/**

182

* Gets table metadata

183

* @param tablePath Path to the table

184

* @return CatalogTable containing metadata

185

*/

186

CatalogTable getTable(ObjectPath tablePath)

187

throws TableNotExistException, CatalogException;

188

189

/**

190

* Checks if a table exists

191

* @param tablePath Path to the table to check

192

* @return true if table exists

193

*/

194

boolean tableExists(ObjectPath tablePath) throws CatalogException;

195

196

/**

197

* Renames a table

198

* @param tablePath Current table path

199

* @param newTableName New table name

200

*/

201

void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)

202

throws TableNotExistException, TableAlreadyExistException, CatalogException;

203

204

/**

205

* Alters table metadata

206

* @param tablePath Path to the table

207

* @param newTable New table metadata

208

* @param ignoreIfNotExists Skip error if table doesn't exist

209

*/

210

void alterTable(ObjectPath tablePath, CatalogTable newTable, boolean ignoreIfNotExists)

211

throws TableNotExistException, CatalogException;

212

}

213

```

214

215

**Usage Examples:**

216

217

```java

218

// Create table with comprehensive metadata

219

Schema schema = Schema.newBuilder()

220

.column("order_id", DataTypes.BIGINT())

221

.column("customer_id", DataTypes.BIGINT())

222

.column("product_id", DataTypes.BIGINT())

223

.column("quantity", DataTypes.INT())

224

.column("unit_price", DataTypes.DECIMAL(10, 2))

225

.column("order_date", DataTypes.DATE())

226

.column("region", DataTypes.STRING())

227

.primaryKey("order_id")

228

.build();

229

230

Map<String, String> properties = new HashMap<>();

231

properties.put("connector", "kafka");

232

properties.put("topic", "orders");

233

properties.put("properties.bootstrap.servers", "localhost:9092");

234

properties.put("format", "json");

235

236

CatalogTable ordersTable = CatalogTable.of(

237

schema,

238

"Orders table for e-commerce analytics",

239

Arrays.asList("region", "order_date"), // Partition keys

240

properties

241

);

242

243

ObjectPath tablePath = new ObjectPath("analytics", "orders");

244

catalog.createTable(tablePath, ordersTable, false);

245

246

// List and inspect tables

247

List<String> tables = catalog.listTables("analytics");

248

for (String tableName : tables) {

249

ObjectPath path = new ObjectPath("analytics", tableName);

250

CatalogTable table = catalog.getTable(path);

251

System.out.println("Table: " + tableName);

252

System.out.println("Schema: " + table.getUnresolvedSchema());

253

System.out.println("Properties: " + table.getOptions());

254

}

255

```

256

257

### Function Management

258

259

Manage user-defined functions in the catalog with versioning and metadata.

260

261

```java { .api }

262

interface Catalog {

263

/**

264

* Creates a function in the catalog

265

* @param functionPath Path to the function (database.function)

266

* @param function Function metadata

267

* @param ignoreIfExists Skip creation if function already exists

268

*/

269

void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)

270

throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;

271

272

/**

273

* Drops a function from the catalog

274

* @param functionPath Path to the function to drop

275

* @param ignoreIfNotExists Skip error if function doesn't exist

276

*/

277

void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)

278

throws FunctionNotExistException, CatalogException;

279

280

/**

281

* Lists all functions in a database

282

* @param databaseName Database name

283

* @return List of function names

284

*/

285

List<String> listFunctions(String databaseName)

286

throws DatabaseNotExistException, CatalogException;

287

288

/**

289

* Gets function metadata

290

* @param functionPath Path to the function

291

* @return CatalogFunction containing metadata

292

*/

293

CatalogFunction getFunction(ObjectPath functionPath)

294

throws FunctionNotExistException, CatalogException;

295

296

/**

297

* Checks if a function exists

298

* @param functionPath Path to the function to check

299

* @return true if function exists

300

*/

301

boolean functionExists(ObjectPath functionPath) throws CatalogException;

302

}

303

```

304

305

**Usage Examples:**

306

307

```java

308

// Register UDF in catalog

309

CatalogFunction myFunction = new CatalogFunctionImpl(

310

"com.company.functions.MyCustomFunction",

311

FunctionLanguage.JAVA,

312

Arrays.asList("dependency1.jar", "dependency2.jar"),

313

"Custom function for business logic"

314

);

315

316

ObjectPath functionPath = new ObjectPath("analytics", "my_custom_function");

317

catalog.createFunction(functionPath, myFunction, false);

318

319

// Use function in SQL

320

tableEnv.useCatalog("hive_catalog");

321

tableEnv.useDatabase("analytics");

322

Table result = tableEnv.sqlQuery(

323

"SELECT customer_id, my_custom_function(customer_data) as processed_data " +

324

"FROM customers"

325

);

326

```

327

328

### Partition Management

329

330

Handle partitioned tables with dynamic partition discovery and pruning.

331

332

```java { .api }

333

interface Catalog {

334

/**

335

* Lists all partitions of a partitioned table

336

* @param tablePath Path to the partitioned table

337

* @return List of partition specifications

338

*/

339

List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)

340

throws TableNotExistException, TableNotPartitionedException, CatalogException;

341

342

/**

343

* Lists partitions matching a partial specification

344

* @param tablePath Path to the partitioned table

345

* @param partitionSpec Partial partition specification for filtering

346

* @return List of matching partition specifications

347

*/

348

List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

349

throws TableNotExistException, TableNotPartitionedException, CatalogException;

350

351

/**

352

* Gets partition metadata

353

* @param tablePath Path to the partitioned table

354

* @param partitionSpec Partition specification

355

* @return CatalogPartition containing metadata

356

*/

357

CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

358

throws PartitionNotExistException, CatalogException;

359

360

/**

361

* Checks if a partition exists

362

* @param tablePath Path to the partitioned table

363

* @param partitionSpec Partition specification to check

364

* @return true if partition exists

365

*/

366

boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

367

throws CatalogException;

368

}

369

```

370

371

**Usage Examples:**

372

373

```java

374

// Work with partitioned tables

375

ObjectPath partitionedTable = new ObjectPath("analytics", "daily_sales");

376

377

// List all partitions

378

List<CatalogPartitionSpec> allPartitions = catalog.listPartitions(partitionedTable);

379

for (CatalogPartitionSpec spec : allPartitions) {

380

System.out.println("Partition: " + spec.getPartitionSpec());

381

}

382

383

// List partitions for specific year

384

CatalogPartitionSpec yearFilter = new CatalogPartitionSpec(

385

Collections.singletonMap("year", "2024")

386

);

387

List<CatalogPartitionSpec> yearPartitions = catalog.listPartitions(partitionedTable, yearFilter);

388

389

// Check specific partition

390

CatalogPartitionSpec specificPartition = new CatalogPartitionSpec(

391

Map.of("year", "2024", "month", "01", "day", "15")

392

);

393

boolean exists = catalog.partitionExists(partitionedTable, specificPartition);

394

```

395

396

### Object Path Resolution

397

398

Navigate catalog hierarchies with full path resolution and validation.

399

400

```java { .api }

401

class ObjectPath {

402

/**

403

* Creates an object path for database.object

404

* @param databaseName Database name

405

* @param objectName Object name (table, function, etc.)

406

*/

407

ObjectPath(String databaseName, String objectName);

408

409

/**

410

* Gets the database name

411

* @return Database name

412

*/

413

String getDatabaseName();

414

415

/**

416

* Gets the object name

417

* @return Object name

418

*/

419

String getObjectName();

420

421

/**

422

* Gets the full path as a string

423

* @return String representation of the path

424

*/

425

String getFullName();

426

}

427

428

class ObjectIdentifier {

429

/**

430

* Creates a full object identifier

431

* @param catalogName Catalog name

432

* @param databaseName Database name

433

* @param objectName Object name

434

*/

435

static ObjectIdentifier of(String catalogName, String databaseName, String objectName);

436

437

/**

438

* Gets the catalog name

439

* @return Catalog name

440

*/

441

String getCatalogName();

442

443

/**

444

* Gets the database name

445

* @return Database name

446

*/

447

String getDatabaseName();

448

449

/**

450

* Gets the object name

451

* @return Object name

452

*/

453

String getObjectName();

454

}

455

```

456

457

**Usage Examples:**

458

459

```java

460

// Object path resolution

461

ObjectPath tablePath = new ObjectPath("sales_db", "orders");

462

ObjectIdentifier fullIdentifier = ObjectIdentifier.of("hive_catalog", "sales_db", "orders");

463

464

// Use in catalog operations

465

CatalogTable table = catalog.getTable(tablePath);

466

String fullPath = fullIdentifier.getCatalogName() + "." +

467

fullIdentifier.getDatabaseName() + "." +

468

fullIdentifier.getObjectName();

469

```

470

471

## Types

472

473

### Catalog Interfaces

474

475

```java { .api }

476

interface Catalog {

477

// Database operations

478

void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);

479

void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);

480

List<String> listDatabases();

481

CatalogDatabase getDatabase(String databaseName);

482

boolean databaseExists(String databaseName);

483

484

// Table operations

485

void createTable(ObjectPath tablePath, CatalogTable table, boolean ignoreIfExists);

486

void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);

487

List<String> listTables(String databaseName);

488

CatalogTable getTable(ObjectPath tablePath);

489

boolean tableExists(ObjectPath tablePath);

490

491

// Function operations

492

void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists);

493

void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists);

494

List<String> listFunctions(String databaseName);

495

CatalogFunction getFunction(ObjectPath functionPath);

496

boolean functionExists(ObjectPath functionPath);

497

}

498

```

499

500

### Catalog Metadata Types

501

502

```java { .api }

503

interface CatalogDatabase {

504

Map<String, String> getProperties();

505

String getComment();

506

CatalogDatabase copy();

507

CatalogDatabase copy(Map<String, String> properties);

508

}

509

510

interface CatalogTable extends CatalogBaseTable {

511

boolean isPartitioned();

512

List<String> getPartitionKeys();

513

CatalogTable copy();

514

CatalogTable copy(Map<String, String> options);

515

}

516

517

interface CatalogFunction {

518

String getClassName();

519

FunctionLanguage getLanguage();

520

List<String> getFunctionResources();

521

String getDescription();

522

CatalogFunction copy();

523

}

524

525

enum FunctionLanguage {

526

JVM,

527

PYTHON,

528

SCALA

529

}

530

```

531

532

### Exception Types

533

534

```java { .api }

535

class CatalogException extends Exception { }

536

class DatabaseAlreadyExistException extends CatalogException { }

537

class DatabaseNotExistException extends CatalogException { }

538

class DatabaseNotEmptyException extends CatalogException { }

539

class TableAlreadyExistException extends CatalogException { }

540

class TableNotExistException extends CatalogException { }

541

class FunctionAlreadyExistException extends CatalogException { }

542

class FunctionNotExistException extends CatalogException { }

543

class PartitionNotExistException extends CatalogException { }

544

class TableNotPartitionedException extends CatalogException { }

545

```