or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration-management.mdfactory-registration.mdfunction-module.mdindex.mdlookup-joins.mdtable-sources-sinks.md

catalog-integration.mddocs/

0

# Catalog Integration

1

2

Hive catalog implementation that provides unified metadata management between Flink and Hive systems. The HiveCatalog enables Flink to seamlessly access Hive databases, tables, partitions, views, and functions through the Hive metastore, maintaining consistency across both ecosystems.

3

4

## Capabilities

5

6

### Hive Catalog

7

8

Primary catalog implementation that integrates with Hive metastore for comprehensive metadata management.

9

10

```java { .api }

11

/**

12

* Catalog implementation for Hive metastore integration

13

* Provides unified metadata management for databases, tables, partitions, and functions

14

*/

15

public class HiveCatalog extends AbstractCatalog {

16

17

/**

18

* Creates a new HiveCatalog instance

19

* @param catalogName Name of the catalog in Flink

20

* @param defaultDatabase Default database name (typically "default")

21

* @param hiveConfDir Path to directory containing hive-site.xml

22

* @param hiveVersion Hive version for compatibility (e.g., "3.1.2")

23

*/

24

public HiveCatalog(String catalogName, String defaultDatabase,

25

String hiveConfDir, String hiveVersion);

26

27

/**

28

* Creates HiveCatalog with additional Hadoop configuration

29

* @param catalogName Name of the catalog in Flink

30

* @param defaultDatabase Default database name

31

* @param hiveConfDir Path to Hive configuration directory

32

* @param hadoopConfDir Path to Hadoop configuration directory

33

* @param hiveVersion Hive version for compatibility

34

*/

35

public HiveCatalog(String catalogName, String defaultDatabase,

36

String hiveConfDir, String hadoopConfDir, String hiveVersion);

37

38

/**

39

* Opens the catalog and establishes metastore connection

40

* @throws CatalogException if connection fails

41

*/

42

public void open() throws CatalogException;

43

44

/**

45

* Closes the catalog and releases resources

46

* @throws CatalogException if close operation fails

47

*/

48

public void close() throws CatalogException;

49

50

// Database Operations

51

52

/**

53

* Lists all databases in the catalog

54

* @return List of database names

55

* @throws CatalogException if operation fails

56

*/

57

public List<String> listDatabases() throws CatalogException;

58

59

/**

60

* Checks if a database exists

61

* @param databaseName Name of the database

62

* @return True if database exists, false otherwise

63

* @throws CatalogException if operation fails

64

*/

65

public boolean databaseExists(String databaseName) throws CatalogException;

66

67

/**

68

* Gets database metadata

69

* @param databaseName Name of the database

70

* @return CatalogDatabase containing metadata

71

* @throws DatabaseNotExistException if database doesn't exist

72

* @throws CatalogException if operation fails

73

*/

74

public CatalogDatabase getDatabase(String databaseName)

75

throws DatabaseNotExistException, CatalogException;

76

77

/**

78

* Creates a new database

79

* @param databaseName Name of the database to create

80

* @param database Database metadata

81

* @param ignoreIfExists Whether to ignore if database already exists

82

* @throws DatabaseAlreadyExistException if database exists and ignoreIfExists is false

83

* @throws CatalogException if operation fails

84

*/

85

public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)

86

throws DatabaseAlreadyExistException, CatalogException;

87

88

/**

89

* Drops an existing database

90

* @param databaseName Name of the database to drop

91

* @param ignoreIfNotExists Whether to ignore if database doesn't exist

92

* @param cascade Whether to drop tables in the database

93

* @throws DatabaseNotExistException if database doesn't exist and ignoreIfNotExists is false

94

* @throws DatabaseNotEmptyException if database is not empty and cascade is false

95

* @throws CatalogException if operation fails

96

*/

97

public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade)

98

throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException;

99

100

// Table Operations

101

102

/**

103

* Lists all tables in a database

104

* @param databaseName Name of the database

105

* @return List of table names

106

* @throws DatabaseNotExistException if database doesn't exist

107

* @throws CatalogException if operation fails

108

*/

109

public List<String> listTables(String databaseName)

110

throws DatabaseNotExistException, CatalogException;

111

112

/**

113

* Checks if a table exists

114

* @param tablePath Path identifying the table (database.table)

115

* @return True if table exists, false otherwise

116

* @throws CatalogException if operation fails

117

*/

118

public boolean tableExists(ObjectPath tablePath) throws CatalogException;

119

120

/**

121

* Gets table metadata

122

* @param tablePath Path identifying the table

123

* @return CatalogBaseTable containing metadata

124

* @throws TableNotExistException if table doesn't exist

125

* @throws CatalogException if operation fails

126

*/

127

public CatalogBaseTable getTable(ObjectPath tablePath)

128

throws TableNotExistException, CatalogException;

129

130

/**

131

* Creates a new table

132

* @param tablePath Path identifying the table

133

* @param table Table metadata

134

* @param ignoreIfExists Whether to ignore if table already exists

135

* @throws TableAlreadyExistException if table exists and ignoreIfExists is false

136

* @throws DatabaseNotExistException if database doesn't exist

137

* @throws CatalogException if operation fails

138

*/

139

public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)

140

throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;

141

142

/**

143

* Drops an existing table

144

* @param tablePath Path identifying the table

145

* @param ignoreIfNotExists Whether to ignore if table doesn't exist

146

* @throws TableNotExistException if table doesn't exist and ignoreIfNotExists is false

147

* @throws CatalogException if operation fails

148

*/

149

public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)

150

throws TableNotExistException, CatalogException;

151

152

// Partition Operations

153

154

/**

155

* Lists all partitions of a table

156

* @param tablePath Path identifying the table

157

* @return List of partition specifications

158

* @throws TableNotExistException if table doesn't exist

159

* @throws TableNotPartitionedException if table is not partitioned

160

* @throws CatalogException if operation fails

161

*/

162

public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)

163

throws TableNotExistException, TableNotPartitionedException, CatalogException;

164

165

/**

166

* Gets partition metadata

167

* @param tablePath Path identifying the table

168

* @param partitionSpec Partition specification

169

* @return CatalogPartition containing metadata

170

* @throws PartitionNotExistException if partition doesn't exist

171

* @throws CatalogException if operation fails

172

*/

173

public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

174

throws PartitionNotExistException, CatalogException;

175

176

/**

177

* Checks if a partition exists

178

* @param tablePath Path identifying the table

179

* @param partitionSpec Partition specification

180

* @return True if partition exists, false otherwise

181

* @throws CatalogException if operation fails

182

*/

183

public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

184

throws CatalogException;

185

186

// Function Operations

187

188

/**

189

* Lists all functions in a database

190

* @param databaseName Name of the database

191

* @return List of function names

192

* @throws DatabaseNotExistException if database doesn't exist

193

* @throws CatalogException if operation fails

194

*/

195

public List<String> listFunctions(String databaseName)

196

throws DatabaseNotExistException, CatalogException;

197

198

/**

199

* Gets function metadata

200

* @param functionPath Path identifying the function

201

* @return CatalogFunction containing metadata

202

* @throws FunctionNotExistException if function doesn't exist

203

* @throws CatalogException if operation fails

204

*/

205

public CatalogFunction getFunction(ObjectPath functionPath)

206

throws FunctionNotExistException, CatalogException;

207

208

/**

209

* Checks if a function exists

210

* @param functionPath Path identifying the function

211

* @return True if function exists, false otherwise

212

* @throws CatalogException if operation fails

213

*/

214

public boolean functionExists(ObjectPath functionPath) throws CatalogException;

215

216

/**

217

* Creates a new function

218

* @param functionPath Path identifying the function

219

* @param function Function metadata

220

* @param ignoreIfExists Whether to ignore if function already exists

221

* @throws FunctionAlreadyExistException if function exists and ignoreIfExists is false

222

* @throws DatabaseNotExistException if database doesn't exist

223

* @throws CatalogException if operation fails

224

*/

225

public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)

226

throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;

227

228

/**

229

* Drops an existing function

230

* @param functionPath Path identifying the function

231

* @param ignoreIfNotExists Whether to ignore if function doesn't exist

232

* @throws FunctionNotExistException if function doesn't exist and ignoreIfNotExists is false

233

* @throws CatalogException if operation fails

234

*/

235

public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)

236

throws FunctionNotExistException, CatalogException;

237

238

/**

239

* Alters an existing function

240

* @param functionPath Path identifying the function

241

* @param newFunction New function metadata

242

* @param ignoreIfNotExists Whether to ignore if function doesn't exist

243

* @throws FunctionNotExistException if function doesn't exist and ignoreIfNotExists is false

244

* @throws CatalogException if operation fails

245

*/

246

public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)

247

throws FunctionNotExistException, CatalogException;

248

249

// Additional Table Operations

250

251

/**

252

* Lists all views in a database

253

* @param databaseName Name of the database

254

* @return List of view names

255

* @throws DatabaseNotExistException if database doesn't exist

256

* @throws CatalogException if operation fails

257

*/

258

public List<String> listViews(String databaseName)

259

throws DatabaseNotExistException, CatalogException;

260

261

/**

262

* Renames an existing table

263

* @param tablePath Current path of the table

264

* @param newTableName New name for the table

265

* @param ignoreIfNotExists Whether to ignore if table doesn't exist

266

* @throws TableNotExistException if table doesn't exist and ignoreIfNotExists is false

267

* @throws TableAlreadyExistException if new table name already exists

268

* @throws CatalogException if operation fails

269

*/

270

public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)

271

throws TableNotExistException, TableAlreadyExistException, CatalogException;

272

273

/**

274

* Alters an existing table

275

* @param tablePath Path identifying the table

276

* @param newCatalogTable New table metadata

277

* @param ignoreIfNotExists Whether to ignore if table doesn't exist

278

* @throws TableNotExistException if table doesn't exist and ignoreIfNotExists is false

279

* @throws CatalogException if operation fails

280

*/

281

public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)

282

throws TableNotExistException, CatalogException;

283

284

/**

285

* Alters an existing database

286

* @param databaseName Name of the database

287

* @param newDatabase New database metadata

288

* @param ignoreIfNotExists Whether to ignore if database doesn't exist

289

* @throws DatabaseNotExistException if database doesn't exist and ignoreIfNotExists is false

290

* @throws CatalogException if operation fails

291

*/

292

public void alterDatabase(String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists)

293

throws DatabaseNotExistException, CatalogException;

294

295

// Additional Partition Operations

296

297

/**

298

* Lists partitions matching a partial partition specification

299

* @param tablePath Path identifying the table

300

* @param partitionSpec Partial partition specification

301

* @return List of matching partition specifications

302

* @throws TableNotExistException if table doesn't exist

303

* @throws TableNotPartitionedException if table is not partitioned

304

* @throws PartitionSpecInvalidException if partition spec is invalid

305

* @throws CatalogException if operation fails

306

*/

307

public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

308

throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException;

309

310

/**

311

* Lists partitions matching filter expressions

312

* @param tablePath Path identifying the table

313

* @param expressions Filter expressions

314

* @return List of matching partition specifications

315

* @throws TableNotExistException if table doesn't exist

316

* @throws TableNotPartitionedException if table is not partitioned

317

* @throws CatalogException if operation fails

318

*/

319

public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> expressions)

320

throws TableNotExistException, TableNotPartitionedException, CatalogException;

321

322

/**

323

* Creates a new partition

324

* @param tablePath Path identifying the table

325

* @param partitionSpec Partition specification

326

* @param partition Partition metadata

327

* @param ignoreIfExists Whether to ignore if partition already exists

328

* @throws TableNotExistException if table doesn't exist

329

* @throws TableNotPartitionedException if table is not partitioned

330

* @throws PartitionSpecInvalidException if partition spec is invalid

331

* @throws PartitionAlreadyExistsException if partition exists and ignoreIfExists is false

332

* @throws CatalogException if operation fails

333

*/

334

public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,

335

CatalogPartition partition, boolean ignoreIfExists)

336

throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException,

337

PartitionAlreadyExistsException, CatalogException;

338

339

/**

340

* Drops an existing partition

341

* @param tablePath Path identifying the table

342

* @param partitionSpec Partition specification

343

* @param ignoreIfNotExists Whether to ignore if partition doesn't exist

344

* @throws PartitionNotExistException if partition doesn't exist and ignoreIfNotExists is false

345

* @throws CatalogException if operation fails

346

*/

347

public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)

348

throws PartitionNotExistException, CatalogException;

349

350

/**

351

* Alters an existing partition

352

* @param tablePath Path identifying the table

353

* @param partitionSpec Partition specification

354

* @param newPartition New partition metadata

355

* @param ignoreIfNotExists Whether to ignore if partition doesn't exist

356

* @throws PartitionNotExistException if partition doesn't exist and ignoreIfNotExists is false

357

* @throws CatalogException if operation fails

358

*/

359

public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,

360

CatalogPartition newPartition, boolean ignoreIfNotExists)

361

throws PartitionNotExistException, CatalogException;

362

363

// Statistics Operations

364

365

/**

366

* Gets table statistics

367

* @param tablePath Path identifying the table

368

* @return Table statistics

369

* @throws TableNotExistException if table doesn't exist

370

* @throws CatalogException if operation fails

371

*/

372

public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)

373

throws TableNotExistException, CatalogException;

374

375

/**

376

* Gets table column statistics

377

* @param tablePath Path identifying the table

378

* @return Column statistics

379

* @throws TableNotExistException if table doesn't exist

380

* @throws CatalogException if operation fails

381

*/

382

public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)

383

throws TableNotExistException, CatalogException;

384

385

/**

386

* Gets partition statistics

387

* @param tablePath Path identifying the table

388

* @param partitionSpec Partition specification

389

* @return Partition statistics

390

* @throws PartitionNotExistException if partition doesn't exist

391

* @throws CatalogException if operation fails

392

*/

393

public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

394

throws PartitionNotExistException, CatalogException;

395

396

/**

397

* Gets partition column statistics

398

* @param tablePath Path identifying the table

399

* @param partitionSpec Partition specification

400

* @return Partition column statistics

401

* @throws PartitionNotExistException if partition doesn't exist

402

* @throws CatalogException if operation fails

403

*/

404

public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

405

throws PartitionNotExistException, CatalogException;

406

407

// Utility Methods

408

409

// Additional Constructors

410

411

/**

412

* Creates HiveCatalog with explicit HiveConf instance

413

* @param catalogName Name of the catalog in Flink

414

* @param defaultDatabase Default database name

415

* @param hiveConf Hive configuration instance

416

* @param hiveVersion Hive version for compatibility

417

*/

418

public HiveCatalog(String catalogName, String defaultDatabase,

419

HiveConf hiveConf, String hiveVersion);

420

421

/**

422

* Creates HiveCatalog with HiveConf and embedded metastore control

423

* @param catalogName Name of the catalog in Flink

424

* @param defaultDatabase Default database name

425

* @param hiveConf Hive configuration instance

426

* @param hiveVersion Hive version for compatibility

427

* @param allowEmbedded Whether to allow embedded metastore

428

*/

429

public HiveCatalog(String catalogName, String defaultDatabase,

430

HiveConf hiveConf, String hiveVersion, boolean allowEmbedded);

431

432

// Configuration and Access Methods

433

434

/**

435

* Gets the Hive configuration instance

436

* @return HiveConf instance used by this catalog

437

*/

438

public HiveConf getHiveConf();

439

440

/**

441

* Gets the Hive version string

442

* @return Hive version (e.g., "3.1.2")

443

*/

444

public String getHiveVersion();

445

446

/**

447

* Checks if catalog supports managed tables

448

* @return True if managed tables are supported

449

*/

450

public boolean supportsManagedTable();

451

452

/**

453

* Gets factory instance for table factory integration

454

* @return Optional Factory instance

455

*/

456

public Optional<Factory> getFactory();

457

458

/**

459

* Gets table factory for legacy integration

460

* @return Optional TableFactory instance

461

*/

462

public Optional<TableFactory> getTableFactory();

463

464

/**

465

* Gets function definition factory

466

* @return Optional FunctionDefinitionFactory instance

467

*/

468

public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory();

469

470

// Static Utility Methods

471

472

/**

473

* Creates HiveConf from configuration directories

474

* @param hiveConfDir Path to Hive configuration directory

475

* @param hadoopConfDir Path to Hadoop configuration directory

476

* @return Configured HiveConf instance

477

*/

478

public static HiveConf createHiveConf(String hiveConfDir, String hadoopConfDir);

479

480

/**

481

* Extracts field names from Hive field schema list

482

* @param fieldSchemas List of Hive field schemas

483

* @return List of field names

484

*/

485

public static List<String> getFieldNames(List<FieldSchema> fieldSchemas);

486

487

/**

488

* Determines if a table is managed by Hive based on its properties

489

* @param properties Table properties map

490

* @return True if table is a Hive table, false otherwise

491

*/

492

public static boolean isHiveTable(Map<String, String> properties);

493

494

/**

495

* Determines if a table is managed by Hive based on table instance

496

* @param table CatalogBaseTable instance

497

* @return True if table is a Hive table, false otherwise

498

*/

499

public static boolean isHiveTable(CatalogBaseTable table);

500

501

/**

502

* Checks if the metastore is configured for embedded mode

503

* @param hiveConf Hive configuration to check

504

* @return True if using embedded metastore, false otherwise

505

*/

506

public static boolean isEmbeddedMetastore(HiveConf hiveConf);

507

508

// Hive-specific Operations (Internal API)

509

510

/**

511

* Gets the underlying Hive Table object

512

* @param tablePath Path identifying the table

513

* @return Hive Table instance

514

* @throws TableNotExistException if table doesn't exist

515

*/

516

public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException;

517

518

/**

519

* Loads data into a table from external location

520

* @param loadPath Path to data files

521

* @param tablePath Path identifying the target table

522

* @param isOverwrite Whether to overwrite existing data

523

* @param isSrcLocal Whether source is on local filesystem

524

*/

525

public void loadTable(Path loadPath, ObjectPath tablePath, boolean isOverwrite, boolean isSrcLocal);

526

527

/**

528

* Loads data into a specific partition from external location

529

* @param loadPath Path to data files

530

* @param tablePath Path identifying the target table

531

* @param partSpec Partition specification

532

* @param isOverwrite Whether to overwrite existing data

533

* @param isSrcLocal Whether source is on local filesystem

534

*/

535

public void loadPartition(Path loadPath, ObjectPath tablePath, Map<String, String> partSpec,

536

boolean isOverwrite, boolean isSrcLocal);

537

}

538

```

539

540

**Usage Examples:**

541

542

```java

543

// Programmatic catalog creation and registration

544

import org.apache.flink.table.api.EnvironmentSettings;

545

import org.apache.flink.table.api.TableEnvironment;

546

import org.apache.flink.table.catalog.hive.HiveCatalog;

547

548

// Create table environment

549

EnvironmentSettings settings = EnvironmentSettings.newInstance().build();

550

TableEnvironment tableEnv = TableEnvironment.create(settings);

551

552

// Create and register Hive catalog

553

String catalogName = "hive_catalog";

554

String defaultDatabase = "default";

555

String hiveConfDir = "/opt/hive/conf"; // Contains hive-site.xml

556

String hiveVersion = "3.1.2";

557

558

HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion);

559

tableEnv.registerCatalog(catalogName, hive);

560

561

// Set as current catalog

562

tableEnv.useCatalog(catalogName);

563

564

// Use catalog operations

565

tableEnv.executeSql("SHOW DATABASES").print();

566

tableEnv.executeSql("SHOW TABLES").print();

567

tableEnv.executeSql("DESCRIBE hive_table").print();

568

```

569

570

```sql

571

-- SQL DDL catalog usage

572

CREATE CATALOG hive_catalog WITH (

573

'type' = 'hive',

574

'hive-conf-dir' = '/opt/hive/conf',

575

'hive-version' = '3.1.2',

576

'hadoop-conf-dir' = '/opt/hadoop/conf'

577

);

578

579

-- Switch to Hive catalog

580

USE CATALOG hive_catalog;

581

USE default;

582

583

-- Catalog operations

584

SHOW DATABASES;

585

SHOW TABLES;

586

DESCRIBE EXTENDED hive_table;

587

SHOW PARTITIONS hive_partitioned_table;

588

589

-- Create database and tables

590

CREATE DATABASE analytics_db;

591

USE analytics_db;

592

593

CREATE TABLE user_events (

594

user_id BIGINT,

595

event_type STRING,

596

event_time TIMESTAMP,

597

partition_date STRING

598

) PARTITIONED BY (partition_date)

599

STORED AS PARQUET;

600

```

601

602

### Cross-System Compatibility

603

604

The HiveCatalog maintains metadata consistency between Flink and Hive, enabling:

605

606

**Bidirectional Table Access:**

607

- Tables created in Hive are immediately accessible in Flink

608

- Tables created in Flink are visible to Hive clients

609

- Schema evolution is synchronized across both systems

610

611

**Metadata Consistency:**

612

- Table properties and configurations are preserved

613

- Partition metadata is kept in sync

614

- Storage format information is maintained

615

616

**Function Integration:**

617

- Hive UDFs can be called from Flink SQL

618

- Function metadata is shared between systems

619

- Version-specific function loading for compatibility

620

621

### Configuration Properties

622

623

```java { .api }

624

/**

625

* Configuration constants for Hive catalog

626

*/

627

public class HiveCatalogConfig {

628

/** Default separator for column type lists */

629

public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";

630

631

/** Property key for table comments */

632

public static final String COMMENT = "comment";

633

634

/** Property key for partition location */

635

public static final String PARTITION_LOCATION = "partition.location";

636

637

/** Property key for table location */

638

public static final String TABLE_LOCATION = "location";

639

640

/** Property key for storage format */

641

public static final String STORED_AS = "stored-as";

642

643

/** Property key for input format */

644

public static final String INPUT_FORMAT = "input-format";

645

646

/** Property key for output format */

647

public static final String OUTPUT_FORMAT = "output-format";

648

649

/** Property key for serialization library */

650

public static final String SERDE_LIB = "serde-lib";

651

}

652

```

653

654

### Error Handling

655

656

The catalog provides comprehensive error handling for common scenarios:

657

658

```java { .api }

659

// Common exceptions thrown by catalog operations

660

try {

661

CatalogBaseTable table = catalog.getTable(new ObjectPath("db", "table"));

662

} catch (TableNotExistException e) {

663

// Handle missing table

664

} catch (DatabaseNotExistException e) {

665

// Handle missing database

666

} catch (CatalogException e) {

667

// Handle general catalog errors (connectivity, permissions, etc.)

668

}

669

```

670

671

**Common Error Scenarios:**

672

- **Metastore Connectivity**: Network issues or authentication failures

673

- **Permission Errors**: Insufficient privileges for database/table operations

674

- **Schema Conflicts**: Type incompatibilities between Flink and Hive

675

- **Configuration Issues**: Invalid Hive configuration or missing files