or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration.mdfunction-module.mdindex.mdpartition-management.mdtable-source-sink.md

catalog-integration.mddocs/

0

# Catalog Integration

1

2

Complete Hive metastore integration providing schema discovery, table management, and metadata operations. The catalog integration enables transparent access to existing Hive data warehouses with full support for databases, tables, partitions, and metadata management.

3

4

## Capabilities

5

6

### HiveCatalog

7

8

Main catalog implementation that integrates with Hive metastore for comprehensive metadata operations.

9

10

```java { .api }

11

/**

12

* Hive catalog implementation providing full metastore integration

13

* Extends AbstractCatalog to provide Flink-compatible catalog operations

14

*/

15

public class HiveCatalog extends AbstractCatalog {

16

17

/**

18

* Create HiveCatalog instance

19

* @param catalogName Name for this catalog instance

20

* @param defaultDatabase Default database to use

21

* @param hiveConfDir Directory containing hive-site.xml (optional)

22

* @param hadoopConfDir Directory containing Hadoop configuration (optional)

23

* @param hiveVersion Hive version for compatibility (e.g., "2.3.9")

24

*/

25

public HiveCatalog(String catalogName, String defaultDatabase,

26

String hiveConfDir, String hadoopConfDir, String hiveVersion);

27

28

// Database operations

29

/**

30

* List all databases in the Hive metastore

31

* @return List of database names

32

* @throws DatabaseNotExistException If catalog is not accessible

33

* @throws CatalogException If operation fails

34

*/

35

public List<String> listDatabases() throws DatabaseNotExistException, CatalogException;

36

37

/**

38

* Check if database exists

39

* @param databaseName Database name to check

40

* @return true if database exists

41

* @throws CatalogException If operation fails

42

*/

43

public boolean databaseExists(String databaseName) throws CatalogException;

44

45

/**

46

* Create new database

47

* @param databaseName Name of database to create

48

* @param database Database metadata

49

* @param ignoreIfExists Whether to ignore if database already exists

50

* @throws DatabaseAlreadyExistException If database exists and ignoreIfExists is false

51

* @throws CatalogException If operation fails

52

*/

53

public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists)

54

throws DatabaseAlreadyExistException, CatalogException;

55

56

/**

57

* Get database metadata

58

* @param databaseName Database name

59

* @return Database metadata

60

* @throws DatabaseNotExistException If database doesn't exist

61

* @throws CatalogException If operation fails

62

*/

63

public CatalogDatabase getDatabase(String databaseName)

64

throws DatabaseNotExistException, CatalogException;

65

66

// Table operations

67

/**

68

* List all tables in a database

69

* @param databaseName Database name

70

* @return List of table names

71

* @throws DatabaseNotExistException If database doesn't exist

72

* @throws CatalogException If operation fails

73

*/

74

public List<String> listTables(String databaseName)

75

throws DatabaseNotExistException, CatalogException;

76

77

/**

78

* List tables matching a pattern

79

* @param databaseName Database name

80

* @param tableNamePattern SQL-like pattern for table names

81

* @return List of matching table names

82

* @throws DatabaseNotExistException If database doesn't exist

83

* @throws CatalogException If operation fails

84

*/

85

public List<String> listTables(String databaseName, String tableNamePattern)

86

throws DatabaseNotExistException, CatalogException;

87

88

/**

89

* Check if table exists

90

* @param tablePath Table path (database.table)

91

* @return true if table exists

92

* @throws CatalogException If operation fails

93

*/

94

public boolean tableExists(ObjectPath tablePath) throws CatalogException;

95

96

/**

97

* Get table metadata and schema

98

* @param tablePath Table path (database.table)

99

* @return Table metadata including schema

100

* @throws TableNotExistException If table doesn't exist

101

* @throws CatalogException If operation fails

102

*/

103

public CatalogBaseTable getTable(ObjectPath tablePath)

104

throws TableNotExistException, CatalogException;

105

106

/**

107

* Create new table

108

* @param tablePath Table path (database.table)

109

* @param table Table metadata and schema

110

* @param ignoreIfExists Whether to ignore if table already exists

111

* @throws TableAlreadyExistException If table exists and ignoreIfExists is false

112

* @throws DatabaseNotExistException If database doesn't exist

113

* @throws CatalogException If operation fails

114

*/

115

public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)

116

throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;

117

118

/**

119

* Drop table

120

* @param tablePath Table path (database.table)

121

* @param ignoreIfNotExists Whether to ignore if table doesn't exist

122

* @throws TableNotExistException If table doesn't exist and ignoreIfNotExists is false

123

* @throws CatalogException If operation fails

124

*/

125

public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)

126

throws TableNotExistException, CatalogException;

127

128

/**

129

* Rename table

130

* @param tablePath Current table path

131

* @param newTableName New table name

132

* @throws TableNotExistException If table doesn't exist

133

* @throws TableAlreadyExistException If new name already exists

134

* @throws CatalogException If operation fails

135

*/

136

public void renameTable(ObjectPath tablePath, String newTableName)

137

throws TableNotExistException, TableAlreadyExistException, CatalogException;

138

139

// Partition operations

140

/**

141

* List all partitions for a partitioned table

142

* @param tablePath Table path (database.table)

143

* @return List of partition specifications

144

* @throws TableNotExistException If table doesn't exist

145

* @throws TableNotPartitionedException If table is not partitioned

146

* @throws CatalogException If operation fails

147

*/

148

public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)

149

throws TableNotExistException, TableNotPartitionedException, CatalogException;

150

151

/**

152

* List partitions matching partial specification

153

* @param tablePath Table path (database.table)

154

* @param partitionSpec Partial partition specification

155

* @return List of matching partition specifications

156

* @throws TableNotExistException If table doesn't exist

157

* @throws TableNotPartitionedException If table is not partitioned

158

* @throws PartitionSpecInvalidException If partition spec is invalid

159

* @throws CatalogException If operation fails

160

*/

161

public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,

162

CatalogPartitionSpec partitionSpec)

163

throws TableNotExistException, TableNotPartitionedException,

164

PartitionSpecInvalidException, CatalogException;

165

166

/**

167

* Get partition metadata

168

* @param tablePath Table path (database.table)

169

* @param partitionSpec Partition specification

170

* @return Partition metadata

171

* @throws PartitionNotExistException If partition doesn't exist

172

* @throws CatalogException If operation fails

173

*/

174

public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

175

throws PartitionNotExistException, CatalogException;

176

177

/**

178

* Check if partition exists

179

* @param tablePath Table path (database.table)

180

* @param partitionSpec Partition specification

181

* @return true if partition exists

182

* @throws CatalogException If operation fails

183

*/

184

public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)

185

throws CatalogException;

186

187

/**

188

* Create new partition

189

* @param tablePath Table path (database.table)

190

* @param partitionSpec Partition specification

191

* @param partition Partition metadata

192

* @param ignoreIfExists Whether to ignore if partition already exists

193

* @throws TableNotExistException If table doesn't exist

194

* @throws TableNotPartitionedException If table is not partitioned

195

* @throws PartitionSpecInvalidException If partition spec is invalid

196

* @throws PartitionAlreadyExistsException If partition exists and ignoreIfExists is false

197

* @throws CatalogException If operation fails

198

*/

199

public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,

200

CatalogPartition partition, boolean ignoreIfExists)

201

throws TableNotExistException, TableNotPartitionedException,

202

PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException;

203

204

/**

205

* Drop partition

206

* @param tablePath Table path (database.table)

207

* @param partitionSpec Partition specification

208

* @param ignoreIfNotExists Whether to ignore if partition doesn't exist

209

* @throws PartitionNotExistException If partition doesn't exist and ignoreIfNotExists is false

210

* @throws CatalogException If operation fails

211

*/

212

public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec,

213

boolean ignoreIfNotExists)

214

throws PartitionNotExistException, CatalogException;

215

216

// Function operations

217

/**

218

* List user-defined functions in database

219

* @param databaseName Database name

220

* @return List of function names

221

* @throws DatabaseNotExistException If database doesn't exist

222

* @throws CatalogException If operation fails

223

*/

224

public List<String> listFunctions(String databaseName)

225

throws DatabaseNotExistException, CatalogException;

226

227

/**

228

* Get function metadata

229

* @param functionPath Function path (database.function)

230

* @return Function metadata

231

* @throws FunctionNotExistException If function doesn't exist

232

* @throws CatalogException If operation fails

233

*/

234

public CatalogFunction getFunction(ObjectPath functionPath)

235

throws FunctionNotExistException, CatalogException;

236

237

/**

238

* Check if function exists

239

* @param functionPath Function path (database.function)

240

* @return true if function exists

241

* @throws CatalogException If operation fails

242

*/

243

public boolean functionExists(ObjectPath functionPath) throws CatalogException;

244

245

/**

246

* Create user-defined function

247

* @param functionPath Function path (database.function)

248

* @param function Function metadata

249

* @param ignoreIfExists Whether to ignore if function already exists

250

* @throws FunctionAlreadyExistException If function exists and ignoreIfExists is false

251

* @throws DatabaseNotExistException If database doesn't exist

252

* @throws CatalogException If operation fails

253

*/

254

public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)

255

throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException;

256

257

/**

258

* Alter function metadata

259

* @param functionPath Function path (database.function)

260

* @param newFunction New function metadata

261

* @param ignoreIfNotExists Whether to ignore if function doesn't exist

262

* @throws FunctionNotExistException If function doesn't exist and ignoreIfNotExists is false

263

* @throws CatalogException If operation fails

264

*/

265

public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)

266

throws FunctionNotExistException, CatalogException;

267

268

/**

269

* Drop function

270

* @param functionPath Function path (database.function)

271

* @param ignoreIfNotExists Whether to ignore if function doesn't exist

272

* @throws FunctionNotExistException If function doesn't exist and ignoreIfNotExists is false

273

* @throws CatalogException If operation fails

274

*/

275

public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)

276

throws FunctionNotExistException, CatalogException;

277

278

// Statistics operations

279

/**

280

* Get table statistics for cost-based optimization

281

* @param tablePath Table path (database.table)

282

* @return Table statistics

283

* @throws TableNotExistException If table doesn't exist

284

* @throws CatalogException If operation fails

285

*/

286

public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)

287

throws TableNotExistException, CatalogException;

288

289

/**

290

* Get column statistics for cost-based optimization

291

* @param tablePath Table path (database.table)

292

* @return Column statistics

293

* @throws TableNotExistException If table doesn't exist

294

* @throws CatalogException If operation fails

295

*/

296

public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)

297

throws TableNotExistException, CatalogException;

298

299

/**

300

* Get partition statistics

301

* @param tablePath Table path (database.table)

302

* @param partitionSpec Partition specification

303

* @return Partition statistics

304

* @throws PartitionNotExistException If partition doesn't exist

305

* @throws CatalogException If operation fails

306

*/

307

public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath,

308

CatalogPartitionSpec partitionSpec)

309

throws PartitionNotExistException, CatalogException;

310

}

311

```

312

313

**Usage Examples:**

314

315

```java

316

import org.apache.flink.table.catalog.hive.HiveCatalog;

317

import org.apache.flink.table.catalog.ObjectPath;

318

319

// Create and configure Hive catalog

320

String catalogName = "production_hive";

321

String defaultDatabase = "analytics";

322

String hiveConfDir = "/etc/hive/conf"; // Contains hive-site.xml

323

String hadoopConfDir = "/etc/hadoop/conf"; // Contains core-site.xml, hdfs-site.xml

324

String hiveVersion = "2.3.9";

325

326

HiveCatalog catalog = new HiveCatalog(

327

catalogName,

328

defaultDatabase,

329

hiveConfDir,

330

hadoopConfDir,

331

hiveVersion

332

);

333

334

// Register with Table Environment

335

TableEnvironment tableEnv = TableEnvironment.create(settings);

336

tableEnv.registerCatalog("hive", catalog);

337

tableEnv.useCatalog("hive");

338

339

// Database operations

340

List<String> databases = catalog.listDatabases();

341

System.out.println("Available databases: " + databases);

342

343

boolean dbExists = catalog.databaseExists("user_data");

344

if (!dbExists) {

345

CatalogDatabase newDb = new CatalogDatabaseImpl(

346

Map.of("description", "User analytics data"),

347

"Database for user analytics"

348

);

349

catalog.createDatabase("user_data", newDb, false);

350

}

351

352

// Table operations

353

ObjectPath tablePath = new ObjectPath("user_data", "events");

354

if (catalog.tableExists(tablePath)) {

355

CatalogBaseTable table = catalog.getTable(tablePath);

356

TableSchema schema = table.getSchema();

357

System.out.println("Table schema: " + schema);

358

359

// List partitions if table is partitioned

360

if (table instanceof CatalogTable) {

361

CatalogTable catalogTable = (CatalogTable) table;

362

if (catalogTable.isPartitioned()) {

363

List<CatalogPartitionSpec> partitions = catalog.listPartitions(tablePath);

364

System.out.println("Partitions: " + partitions.size());

365

}

366

}

367

}

368

369

// Query through catalog

370

Table result = tableEnv.sqlQuery("SELECT * FROM hive.user_data.events WHERE event_date = '2024-01-01'");

371

```

372

373