or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-management.mdcomplex-event-processing.mdcore-table-operations.mddatastream-integration.mdindex.mdsql-processing.mdtype-system.mduser-defined-functions.mdwindow-operations.md

catalog-management.mddocs/

0

# Catalog and Metadata Management

1

2

This document covers multi-catalog support with database and table metadata management capabilities in Apache Flink Table Uber Blink.

3

4

## Catalog Operations

5

6

### Catalog Registration

7

8

```java { .api }

9

interface TableEnvironment {

10

void registerCatalog(String catalogName, Catalog catalog);

11

Optional<Catalog> getCatalog(String catalogName);

12

void useCatalog(String catalogName);

13

String getCurrentCatalog();

14

void useDatabase(String databaseName);

15

String getCurrentDatabase();

16

}

17

```

18

19

**Usage:**

20

21

```java

22

// Register Hive catalog

23

HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", "/path/to/hive-conf");

24

tEnv.registerCatalog("myhive", hiveCatalog);

25

26

// Register JDBC catalog

27

JdbcCatalog jdbcCatalog = new PostgresCatalog("mypg", "testdb", "user", "pass", "jdbc:postgresql://localhost:5432/testdb");

28

tEnv.registerCatalog("mypg", jdbcCatalog);

29

30

// Switch catalog context

31

tEnv.useCatalog("myhive");

32

tEnv.useDatabase("production");

33

```

34

35

## Built-in Catalogs

36

37

### Generic In-Memory Catalog

38

39

```java { .api }

40

class GenericInMemoryCatalog implements Catalog {

41

GenericInMemoryCatalog(String name);

42

GenericInMemoryCatalog(String name, String defaultDatabase);

43

}

44

```

45

46

**Usage:**

47

48

```java

49

// Create and register in-memory catalog

50

GenericInMemoryCatalog memoryCatalog = new GenericInMemoryCatalog("memory_catalog", "my_db");

51

tEnv.registerCatalog("memory", memoryCatalog);

52

```

53

54

### Hive Catalog

55

56

```java { .api }

57

class HiveCatalog implements Catalog {

58

HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir);

59

HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hiveVersion);

60

HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf);

61

HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, String hiveVersion);

62

}

63

```

64

65

**Usage:**

66

67

```java

68

// Register Hive catalog

69

HiveCatalog hive = new HiveCatalog(

70

"myhive", // catalog name

71

"default", // default database

72

"/opt/hive/conf", // hive conf directory

73

"2.3.4" // hive version

74

);

75

tEnv.registerCatalog("myhive", hive);

76

77

// Use Hive tables

78

tEnv.useCatalog("myhive");

79

Table hiveTable = tEnv.from("hive_database.hive_table");

80

```

81

82

### JDBC Catalogs

83

84

```java { .api }

85

class JdbcCatalog implements Catalog {

86

JdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);

87

}

88

89

class PostgresCatalog extends JdbcCatalog {

90

PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);

91

}

92

93

class MySqlCatalog extends JdbcCatalog {

94

MySqlCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl);

95

}

96

```

97

98

**Usage:**

99

100

```java

101

// PostgreSQL catalog

102

PostgresCatalog pgCatalog = new PostgresCatalog(

103

"mypg",

104

"postgres",

105

"user",

106

"password",

107

"jdbc:postgresql://localhost:5432/"

108

);

109

tEnv.registerCatalog("mypg", pgCatalog);

110

111

// MySQL catalog

112

MySqlCatalog mysqlCatalog = new MySqlCatalog(

113

"mysql",

114

"test",

115

"root",

116

"root",

117

"jdbc:mysql://localhost:3306"

118

);

119

tEnv.registerCatalog("mysql", mysqlCatalog);

120

```

121

122

## Metadata Listing

123

124

### Listing Operations

125

126

```java { .api }

127

interface TableEnvironment {

128

String[] listCatalogs();

129

String[] listDatabases();

130

String[] listTables();

131

String[] listViews();

132

String[] listUserDefinedFunctions();

133

String[] listFunctions();

134

String[] listModules();

135

}

136

```

137

138

**Usage:**

139

140

```java

141

// List all metadata

142

String[] catalogs = tEnv.listCatalogs();

143

String[] databases = tEnv.listDatabases();

144

String[] tables = tEnv.listTables();

145

String[] views = tEnv.listViews();

146

String[] functions = tEnv.listUserDefinedFunctions();

147

148

// Print metadata hierarchy

149

for (String catalog : catalogs) {

150

System.out.println("Catalog: " + catalog);

151

tEnv.useCatalog(catalog);

152

153

for (String database : tEnv.listDatabases()) {

154

System.out.println(" Database: " + database);

155

tEnv.useDatabase(database);

156

157

for (String table : tEnv.listTables()) {

158

System.out.println(" Table: " + table);

159

}

160

}

161

}

162

```

163

164

## Database Operations

165

166

### Database Management

167

168

```java { .api }

169

interface Catalog {

170

boolean databaseExists(String databaseName);

171

void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists);

172

void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade);

173

void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists);

174

List<String> listDatabases();

175

CatalogDatabase getDatabase(String databaseName);

176

}

177

```

178

179

**Usage:**

180

181

```java

182

// Create database

183

CatalogDatabase newDatabase = new CatalogDatabaseImpl(

184

Map.of("location", "/path/to/database"),

185

"My custom database"

186

);

187

188

Catalog catalog = tEnv.getCatalog("myhive").get();

189

catalog.createDatabase("my_db", newDatabase, false);

190

191

// Check database existence

192

boolean exists = catalog.databaseExists("my_db");

193

194

// Get database metadata

195

CatalogDatabase dbMeta = catalog.getDatabase("my_db");

196

Map<String, String> properties = dbMeta.getProperties();

197

String comment = dbMeta.getComment();

198

```

199

200

### Database SQL Operations

201

202

```sql

203

-- Create database

204

CREATE DATABASE [IF NOT EXISTS] database_name

205

[COMMENT 'comment']

206

[WITH (key1=val1, key2=val2, ...)];

207

208

-- Drop database

209

DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];

210

211

-- Show databases

212

SHOW DATABASES;

213

214

-- Describe database

215

DESCRIBE DATABASE database_name;

216

DESC DATABASE database_name;

217

218

-- Use database

219

USE database_name;

220

```

221

222

## Table Operations

223

224

### Table Management

225

226

```java { .api }

227

interface Catalog {

228

boolean tableExists(ObjectPath tablePath);

229

void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);

230

void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);

231

void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists);

232

List<String> listTables(String databaseName);

233

CatalogBaseTable getTable(ObjectPath tablePath);

234

TableStatistics getTableStatistics(ObjectPath tablePath);

235

CatalogTableStatistics getTableStatistics(ObjectPath tablePath);

236

CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath);

237

}

238

```

239

240

**Usage:**

241

242

```java

243

ObjectPath tablePath = new ObjectPath("my_db", "my_table");

244

245

// Check table existence

246

boolean tableExists = catalog.tableExists(tablePath);

247

248

// Get table metadata

249

CatalogBaseTable table = catalog.getTable(tablePath);

250

Map<String, String> options = table.getOptions();

251

TableSchema schema = table.getSchema();

252

String comment = table.getComment();

253

254

// Get table statistics

255

CatalogTableStatistics stats = catalog.getTableStatistics(tablePath);

256

long rowCount = stats.getRowCount();

257

Map<String, CatalogColumnStatistics> columnStats = stats.getColumnStatisticsData();

258

```

259

260

### Object Path Handling

261

262

```java { .api }

263

class ObjectPath {

264

ObjectPath(String databaseName, String objectName);

265

String getDatabaseName();

266

String getObjectName();

267

String getFullName();

268

269

static ObjectPath fromString(String fullName);

270

}

271

```

272

273

**Usage:**

274

275

```java

276

// Create object paths

277

ObjectPath path1 = new ObjectPath("database", "table");

278

ObjectPath path2 = ObjectPath.fromString("database.table");

279

280

// Multi-part identifiers

281

String fullName = "catalog.database.table";

282

// Parse into components for object path

283

String[] parts = fullName.split("\\.");

284

ObjectPath path = new ObjectPath(parts[1], parts[2]); // database.table

285

```

286

287

## Custom Catalog Implementation

288

289

### Catalog Interface Implementation

290

291

```java

292

public class CustomCatalog implements Catalog {

293

private final String catalogName;

294

private final String defaultDatabase;

295

296

public CustomCatalog(String catalogName, String defaultDatabase) {

297

this.catalogName = catalogName;

298

this.defaultDatabase = defaultDatabase;

299

}

300

301

@Override

302

public void open() throws CatalogException {

303

// Initialize catalog connection

304

}

305

306

@Override

307

public void close() throws CatalogException {

308

// Clean up resources

309

}

310

311

@Override

312

public String getDefaultDatabase() throws CatalogException {

313

return defaultDatabase;

314

}

315

316

@Override

317

public boolean databaseExists(String databaseName) throws CatalogException {

318

// Implementation to check database existence

319

return checkDatabaseExists(databaseName);

320

}

321

322

@Override

323

public List<String> listDatabases() throws CatalogException {

324

// Implementation to list databases

325

return getDatabaseList();

326

}

327

328

@Override

329

public boolean tableExists(ObjectPath tablePath) throws CatalogException {

330

// Implementation to check table existence

331

return checkTableExists(tablePath);

332

}

333

334

@Override

335

public List<String> listTables(String databaseName) throws CatalogException {

336

// Implementation to list tables

337

return getTableList(databaseName);

338

}

339

340

@Override

341

public CatalogBaseTable getTable(ObjectPath tablePath) throws CatalogException {

342

// Implementation to get table metadata

343

return loadTableMetadata(tablePath);

344

}

345

346

// Implement other required methods...

347

}

348

```

349

350

## Multi-Catalog Queries

351

352

### Cross-Catalog Operations

353

354

```sql

355

-- Query across multiple catalogs

356

SELECT

357

h.user_id,

358

h.purchase_amount,

359

p.user_name,

360

p.email

361

FROM myhive.sales.purchases h

362

JOIN mypg.users.profiles p ON h.user_id = p.user_id

363

WHERE h.purchase_date >= CURRENT_DATE - INTERVAL '7' DAY;

364

365

-- Insert from one catalog to another

366

INSERT INTO myhive.warehouse.sales_summary

367

SELECT

368

user_id,

369

SUM(amount) as total_amount,

370

COUNT(*) as transaction_count

371

FROM mypg.transactional.orders

372

WHERE order_date >= CURRENT_DATE - INTERVAL '1' DAY

373

GROUP BY user_id;

374

```

375

376

### Catalog Resolution

377

378

```java

379

// Fully qualified table names

380

Table hiveTable = tEnv.from("myhive.production.user_events");

381

Table pgTable = tEnv.from("mypg.analytics.user_profiles");

382

383

// Join across catalogs

384

Table joined = hiveTable

385

.join(pgTable, $("myhive.production.user_events.user_id").isEqual($("mypg.analytics.user_profiles.id")))

386

.select($("user_id"), $("event_type"), $("name"), $("email"));

387

```

388

389

## Configuration

390

391

### Catalog Configuration

392

393

```java

394

// Configure catalog properties

395

Map<String, String> hiveProperties = new HashMap<>();

396

hiveProperties.put("hive.metastore.uris", "thrift://localhost:9083");

397

hiveProperties.put("hadoop.conf.dir", "/etc/hadoop/conf");

398

399

HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", hiveProperties);

400

401

// JDBC catalog with connection pool

402

Map<String, String> jdbcProperties = new HashMap<>();

403

jdbcProperties.put("connection.pool.max-size", "10");

404

jdbcProperties.put("connection.timeout", "30000");

405

406

PostgresCatalog pgCatalog = new PostgresCatalog("mypg", "postgres", "user", "pass", "jdbc:postgresql://localhost:5432/", jdbcProperties);

407

```

408

409

### Metadata Cache Configuration

410

411

```java

412

Configuration config = tEnv.getConfig().getConfiguration();

413

414

// Configure metadata cache

415

config.setString("table.catalog.cache.expiration-time", "10 min");

416

config.setBoolean("table.catalog.cache.enabled", true);

417

418

// Configure Hive metastore client

419

config.setString("table.catalog.hive.metastore.client.factory", "org.apache.hadoop.hive.metastore.HiveMetaStoreClientFactory");

420

```

421

422

## Error Handling

423

424

```java { .api }

425

class CatalogException extends Exception {

426

CatalogException(String message);

427

CatalogException(String message, Throwable cause);

428

}

429

430

class DatabaseNotExistException extends CatalogException;

431

class DatabaseAlreadyExistException extends CatalogException;

432

class TableNotExistException extends CatalogException;

433

class TableAlreadyExistException extends CatalogException;

434

```

435

436

## Types

437

438

```java { .api }

439

interface CatalogDatabase {

440

Map<String, String> getProperties();

441

String getComment();

442

CatalogDatabase copy();

443

CatalogDatabase copy(Map<String, String> properties);

444

Optional<String> getDescription();

445

Optional<String> getDetailedDescription();

446

}

447

448

interface CatalogBaseTable {

449

Map<String, String> getOptions();

450

String getComment();

451

CatalogBaseTable copy(Map<String, String> options);

452

Optional<String> getDescription();

453

Optional<String> getDetailedDescription();

454

TableSchema getSchema(); // Deprecated

455

Schema getUnresolvedSchema();

456

}

457

458

class CatalogTable implements CatalogBaseTable {

459

CatalogTable(TableSchema tableSchema, Map<String, String> properties, String comment);

460

CatalogTable(Schema schema, String comment, List<String> partitionKeys, Map<String, String> options);

461

462

boolean isPartitioned();

463

List<String> getPartitionKeys();

464

}

465

466

class CatalogView implements CatalogBaseTable {

467

CatalogView(String originalQuery, String expandedQuery, TableSchema schema, Map<String, String> properties, String comment);

468

469

String getOriginalQuery();

470

String getExpandedQuery();

471

}

472

473

interface CatalogFunction {

474

String getClassName();

475

FunctionLanguage getFunctionLanguage();

476

List<String> getFunctionResources();

477

}

478

479

enum FunctionLanguage {

480

JVM,

481

PYTHON,

482

SCALA

483

}

484

```