or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdcode-generation.mdexpression-system.mdfunction-integration.mdindex.mdplanner-factory.mdquery-planning.md

catalog-integration.mddocs/

0

# Catalog Integration

1

2

The catalog integration system provides seamless integration with Flink's catalog system for metadata management, table registration, schema information, and function discovery. It supports multiple catalog types, schema evolution, and metadata persistence across different storage systems.

3

4

## Capabilities

5

6

### CatalogManager Integration

7

8

Integration with Flink's central catalog management system for metadata operations:

9

10

```java { .api }

11

/**

12

* Catalog manager integration (from flink-table-api-java)

13

* Provides centralized catalog and database management

14

*/

15

public interface CatalogManager {

16

17

/**

18

* Registers a catalog with the catalog manager

19

* @param catalogName Name of the catalog

20

* @param catalog Catalog instance to register

21

*/

22

void registerCatalog(String catalogName, Catalog catalog);

23

24

/**

25

* Gets a registered catalog by name

26

* @param catalogName Name of the catalog

27

* @return Optional catalog instance

28

*/

29

Optional<Catalog> getCatalog(String catalogName);

30

31

/**

32

* Lists all registered catalog names

33

* @return Set of catalog names

34

*/

35

Set<String> listCatalogs();

36

37

/**

38

* Sets the current catalog

39

* @param catalogName Name of catalog to set as current

40

*/

41

void setCurrentCatalog(String catalogName);

42

43

/**

44

* Gets the current catalog name

45

* @return Current catalog name

46

*/

47

String getCurrentCatalog();

48

49

/**

50

* Sets the current database within current catalog

51

* @param databaseName Database name to set as current

52

*/

53

void setCurrentDatabase(String databaseName);

54

55

/**

56

* Gets the current database name

57

* @return Current database name

58

*/

59

String getCurrentDatabase();

60

61

/**

62

* Resolves object identifier to qualified name

63

* @param identifier Object identifier to resolve

64

* @return Fully qualified object identifier

65

*/

66

ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

67

}

68

```

69

70

### FunctionCatalog Integration

71

72

Integration with Flink's function catalog for user-defined function management:

73

74

```java { .api }

75

/**

76

* Function catalog integration (from flink-table-api-java)

77

* Manages function registration, lookup, and resolution

78

*/

79

public interface FunctionCatalog {

80

81

/**

82

* Registers a temporary system function

83

* @param name Function name

84

* @param functionDefinition Function definition

85

*/

86

void registerTemporarySystemFunction(String name, FunctionDefinition functionDefinition);

87

88

/**

89

* Registers a temporary catalog function

90

* @param objectIdentifier Function identifier with catalog/database/name

91

* @param functionDefinition Function definition

92

* @param ignoreIfExists Whether to ignore if function already exists

93

*/

94

void registerTemporaryCatalogFunction(

95

ObjectIdentifier objectIdentifier,

96

FunctionDefinition functionDefinition,

97

boolean ignoreIfExists

98

);

99

100

/**

101

* Drops a temporary system function

102

* @param name Function name to drop

103

* @return True if function was dropped, false if not found

104

*/

105

boolean dropTemporarySystemFunction(String name);

106

107

/**

108

* Drops a temporary catalog function

109

* @param identifier Function identifier

110

* @param ignoreIfNotExist Whether to ignore if function doesn't exist

111

* @return True if function was dropped

112

*/

113

boolean dropTemporaryCatalogFunction(ObjectIdentifier identifier, boolean ignoreIfNotExist);

114

115

/**

116

* Looks up function by identifier

117

* @param objectIdentifier Function identifier

118

* @return Optional function lookup result

119

*/

120

Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier objectIdentifier);

121

122

/**

123

* Lists user-defined functions in given catalog and database

124

* @param catalogName Catalog name

125

* @param databaseName Database name

126

* @return Array of function names

127

*/

128

String[] listFunctions(String catalogName, String databaseName);

129

130

/**

131

* Lists temporary functions

132

* @return Array of temporary function names

133

*/

134

String[] listTemporaryFunctions();

135

}

136

```

137

138

### Table Source and Sink Integration

139

140

Integration with table sources and sinks through the catalog system:

141

142

```scala { .api }

143

/**

144

* Table source utilities for catalog integration

145

*/

146

object TableSourceUtil {

147

148

/**

149

* Validates table source capabilities and configuration

150

* @param tableSource Table source to validate

151

* @param schema Expected table schema

152

* @throws ValidationException if source is invalid

153

*/

154

def validateTableSource(tableSource: TableSource[_], schema: TableSchema): Unit

155

156

/**

157

* Creates dynamic table source from catalog table

158

* @param catalogTable Catalog table definition

159

* @param context Dynamic table context

160

* @return Dynamic table source instance

161

*/

162

def createDynamicTableSource(

163

catalogTable: CatalogBaseTable,

164

context: DynamicTableSource.Context

165

): DynamicTableSource

166

167

/**

168

* Extracts watermark strategy from table source

169

* @param tableSource Table source with watermark information

170

* @return Optional watermark strategy

171

*/

172

def extractWatermarkStrategy(tableSource: TableSource[_]): Option[WatermarkStrategy[_]]

173

174

/**

175

* Validates partition information for partitioned sources

176

* @param partitions Partition specifications

177

* @param tableSchema Table schema with partition keys

178

* @throws ValidationException if partitions are invalid

179

*/

180

def validatePartitions(

181

partitions: java.util.List[CatalogPartitionSpec],

182

tableSchema: TableSchema

183

): Unit

184

185

/**

186

* Creates table source from legacy table factory

187

* @param properties Table properties from catalog

188

* @param isTemporary Whether this is a temporary table

189

* @return Legacy table source instance

190

*/

191

def createTableSource(

192

properties: java.util.Map[String, String],

193

isTemporary: Boolean

194

): TableSource[_]

195

}

196

```

197

198

### Schema Evolution Support

199

200

Support for schema evolution and compatibility checking:

201

202

```java { .api }

203

/**

204

* Schema evolution utilities for catalog integration

205

*/

206

public class SchemaEvolutionUtil {

207

208

/**

209

* Checks schema compatibility between versions

210

* @param oldSchema Previous schema version

211

* @param newSchema New schema version

212

* @return Compatibility check result

213

*/

214

public static CompatibilityResult checkCompatibility(

215

TableSchema oldSchema,

216

TableSchema newSchema

217

);

218

219

/**

220

* Evolves schema with backward compatibility

221

* @param currentSchema Current schema

222

* @param evolutionSpec Schema evolution specification

223

* @return Evolved schema

224

* @throws SchemaEvolutionException if evolution is not compatible

225

*/

226

public static TableSchema evolveSchema(

227

TableSchema currentSchema,

228

SchemaEvolutionSpec evolutionSpec

229

) throws SchemaEvolutionException;

230

231

/**

232

* Validates column changes for compatibility

233

* @param oldColumn Old column definition

234

* @param newColumn New column definition

235

* @return True if change is compatible

236

*/

237

public static boolean isColumnChangeCompatible(

238

TableColumn oldColumn,

239

TableColumn newColumn

240

);

241

242

/**

243

* Creates schema projection for subset of columns

244

* @param originalSchema Original table schema

245

* @param selectedFields Selected field names

246

* @return Projected schema with selected fields only

247

*/

248

public static TableSchema projectSchema(

249

TableSchema originalSchema,

250

List<String> selectedFields

251

);

252

}

253

```

254

255

## Catalog Types and Configuration

256

257

### Built-in Catalog Support

258

259

Support for various catalog implementations:

260

261

```java { .api }

262

/**

263

* Built-in catalog types supported by Flink

264

*/

265

public enum CatalogType {

266

GENERIC_IN_MEMORY, // GenericInMemoryCatalog - for testing and temporary use

267

HIVE, // HiveCatalog - Apache Hive Metastore integration

268

JDBC, // JdbcCatalog - JDBC-based catalog storage

269

ELASTICSEARCH, // ElasticsearchCatalog - for Elasticsearch integration

270

CUSTOM // Custom catalog implementations

271

}

272

273

/**

274

* Catalog configuration properties

275

*/

276

public class CatalogProperties {

277

public static final String CATALOG_TYPE = "type";

278

public static final String CATALOG_DEFAULT_DATABASE = "default-database";

279

public static final String CATALOG_HIVE_CONF_DIR = "hive-conf-dir";

280

public static final String CATALOG_HIVE_VERSION = "hive-version";

281

public static final String CATALOG_JDBC_URL = "default-database.jdbc.url";

282

public static final String CATALOG_JDBC_USERNAME = "default-database.jdbc.username";

283

public static final String CATALOG_JDBC_PASSWORD = "default-database.jdbc.password";

284

}

285

```

286

287

### Catalog Factory Integration

288

289

Integration with Flink's catalog factory system:

290

291

```java { .api }

292

/**

293

* Catalog factory for creating catalog instances

294

*/

295

public interface CatalogFactory extends Factory {

296

297

/**

298

* Creates catalog instance from configuration

299

* @param context Factory context with configuration

300

* @return Created catalog instance

301

*/

302

Catalog createCatalog(Context context);

303

304

/**

305

* Returns factory identifier

306

* @return Unique factory identifier

307

*/

308

String factoryIdentifier();

309

310

/**

311

* Returns required configuration options

312

* @return Set of required options

313

*/

314

Set<ConfigOption<?>> requiredOptions();

315

316

/**

317

* Returns optional configuration options

318

* @return Set of optional options

319

*/

320

Set<ConfigOption<?>> optionalOptions();

321

}

322

```

323

324

## Metadata Management

325

326

### Table Metadata Operations

327

328

Operations for managing table metadata through catalogs:

329

330

```java

331

// Create table through catalog

332

CatalogTable catalogTable = CatalogTable.of(

333

Schema.newBuilder()

334

.column("id", DataTypes.BIGINT())

335

.column("name", DataTypes.STRING())

336

.column("ts", DataTypes.TIMESTAMP(3))

337

.watermark("ts", "ts - INTERVAL '5' SECOND")

338

.primaryKey("id")

339

.build(),

340

"Table for user data",

341

Collections.emptyList(),

342

tableProperties

343

);

344

345

ObjectPath tablePath = new ObjectPath("default_database", "users");

346

catalog.createTable(tablePath, catalogTable, false);

347

348

// Query table metadata

349

CatalogBaseTable table = catalog.getTable(tablePath);

350

TableSchema schema = table.getUnresolvedSchema().resolve(typeFactory);

351

```

352

353

### Database Operations

354

355

Database-level operations through catalog integration:

356

357

```java

358

// Create database

359

CatalogDatabase database = new CatalogDatabaseImpl(

360

Collections.singletonMap("location", "/path/to/database"),

361

"User database for analytics"

362

);

363

364

catalog.createDatabase("analytics_db", database, false);

365

366

// List databases and tables

367

List<String> databases = catalog.listDatabases();

368

List<String> tables = catalog.listTables("analytics_db");

369

```

370

371

### Partition Management

372

373

Support for partitioned table management:

374

375

```java { .api }

376

/**

377

* Partition management through catalog integration

378

*/

379

public interface PartitionManager {

380

381

/**

382

* Creates partition in catalog table

383

* @param tablePath Table object path

384

* @param partitionSpec Partition specification

385

* @param partition Partition metadata

386

* @param ignoreIfExists Whether to ignore if partition exists

387

*/

388

void createPartition(

389

ObjectPath tablePath,

390

CatalogPartitionSpec partitionSpec,

391

CatalogPartition partition,

392

boolean ignoreIfExists

393

);

394

395

/**

396

* Lists partitions for table

397

* @param tablePath Table object path

398

* @return List of partition specifications

399

*/

400

List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath);

401

402

/**

403

* Gets partition metadata

404

* @param tablePath Table object path

405

* @param partitionSpec Partition specification

406

* @return Partition metadata

407

*/

408

CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec);

409

410

/**

411

* Drops partition from table

412

* @param tablePath Table object path

413

* @param partitionSpec Partition specification

414

* @param ignoreIfNotExists Whether to ignore if partition doesn't exist

415

*/

416

void dropPartition(

417

ObjectPath tablePath,

418

CatalogPartitionSpec partitionSpec,

419

boolean ignoreIfNotExists

420

);

421

}

422

```

423

424

## Advanced Catalog Features

425

426

### Statistics Integration

427

428

Integration with table and column statistics:

429

430

```java { .api }

431

/**

432

* Statistics management through catalog

433

*/

434

public interface StatisticsManager {

435

436

/**

437

* Gets table statistics from catalog

438

* @param objectPath Table path

439

* @return Table statistics or empty if not available

440

*/

441

Optional<CatalogTableStatistics> getTableStatistics(ObjectPath objectPath);

442

443

/**

444

* Gets column statistics from catalog

445

* @param objectPath Table path

446

* @return Column statistics or empty if not available

447

*/

448

Optional<CatalogColumnStatistics> getTableColumnStatistics(ObjectPath objectPath);

449

450

/**

451

* Updates table statistics in catalog

452

* @param objectPath Table path

453

* @param tableStatistics Updated table statistics

454

* @param ignoreIfNotExists Whether to ignore if table doesn't exist

455

*/

456

void alterTableStatistics(

457

ObjectPath objectPath,

458

CatalogTableStatistics tableStatistics,

459

boolean ignoreIfNotExists

460

);

461

462

/**

463

* Updates column statistics in catalog

464

* @param objectPath Table path

465

* @param columnStatistics Updated column statistics

466

* @param ignoreIfNotExists Whether to ignore if table doesn't exist

467

*/

468

void alterTableColumnStatistics(

469

ObjectPath objectPath,

470

CatalogColumnStatistics columnStatistics,

471

boolean ignoreIfNotExists

472

);

473

}

474

```

475

476

### View Management

477

478

Support for view creation and management:

479

480

```java

481

// Create view through catalog

482

CatalogView catalogView = CatalogView.of(

483

Schema.newBuilder()

484

.column("user_id", DataTypes.BIGINT())

485

.column("user_name", DataTypes.STRING())

486

.build(),

487

"Active users view",

488

"SELECT id as user_id, name as user_name FROM users WHERE active = true",

489

Collections.emptyList(),

490

"Comment for active users view"

491

);

492

493

ObjectPath viewPath = new ObjectPath("default_database", "active_users");

494

catalog.createTable(viewPath, catalogView, false);

495

```

496

497

## Error Handling and Validation

498

499

Common catalog integration error scenarios:

500

501

```java

502

try {

503

// Catalog operations that may fail

504

catalog.createTable(tablePath, catalogTable, false);

505

} catch (TableAlreadyExistException e) {

506

// Handle table already exists

507

} catch (DatabaseNotExistException e) {

508

// Handle database doesn't exist

509

} catch (CatalogException e) {

510

// Handle general catalog errors

511

}

512

513

// Function catalog error handling

514

try {

515

functionCatalog.registerTemporaryCatalogFunction(identifier, functionDef, false);

516

} catch (FunctionAlreadyExistException e) {

517

// Handle function already exists

518

} catch (ValidationException e) {

519

// Handle function validation errors

520

}

521

```

522

523

## Configuration Examples

524

525

### Hive Catalog Configuration

526

527

```java

528

Map<String, String> hiveProperties = new HashMap<>();

529

hiveProperties.put(CatalogProperties.CATALOG_TYPE, "hive");

530

hiveProperties.put(CatalogProperties.CATALOG_DEFAULT_DATABASE, "default");

531

hiveProperties.put(CatalogProperties.CATALOG_HIVE_CONF_DIR, "/opt/hive/conf");

532

hiveProperties.put(CatalogProperties.CATALOG_HIVE_VERSION, "3.1.2");

533

534

HiveCatalog hiveCatalog = new HiveCatalog("hive_catalog", "default", hiveConfDir);

535

catalogManager.registerCatalog("hive_catalog", hiveCatalog);

536

catalogManager.setCurrentCatalog("hive_catalog");

537

```

538

539

### JDBC Catalog Configuration

540

541

```java

542

Map<String, String> jdbcProperties = new HashMap<>();

543

jdbcProperties.put(CatalogProperties.CATALOG_TYPE, "jdbc");

544

jdbcProperties.put(CatalogProperties.CATALOG_DEFAULT_DATABASE, "analytics");

545

jdbcProperties.put(CatalogProperties.CATALOG_JDBC_URL, "jdbc:postgresql://localhost:5432/metadata");

546

jdbcProperties.put(CatalogProperties.CATALOG_JDBC_USERNAME, "catalog_user");

547

jdbcProperties.put(CatalogProperties.CATALOG_JDBC_PASSWORD, "catalog_password");

548

549

JdbcCatalog jdbcCatalog = new JdbcCatalog("jdbc_catalog", "analytics", jdbcUrl, username, password);

550

catalogManager.registerCatalog("jdbc_catalog", jdbcCatalog);

551

```