or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro.mdhadoop.mdhbase.mdhcatalog.mdindex.mdjdbc.md

hcatalog.mddocs/

0

# HCatalog Connector

1

2

Apache Hive HCatalog metadata integration for Flink batch processing, enabling access to Hive tables with schema support, partition filtering, and automatic type mapping.

3

4

## Capabilities

5

6

### HCatInputFormatBase

7

8

Abstract base InputFormat for reading from HCatalog tables with comprehensive configuration options.

9

10

```java { .api }

11

/**

12

* Abstract base InputFormat for reading from HCatalog tables

13

* @param <T> The type of records produced (typically HCatRecord or Flink Tuple)

14

*/

15

public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>

16

implements ResultTypeQueryable<T> {

17

18

/**

19

* Default constructor using default database and table from context

20

*/

21

public HCatInputFormatBase();

22

23

/**

24

* Creates HCatInputFormatBase for a specific database and table

25

* @param database Name of the Hive database

26

* @param table Name of the Hive table

27

*/

28

public HCatInputFormatBase(String database, String table);

29

30

/**

31

* Creates HCatInputFormatBase with custom Hadoop configuration

32

* @param database Name of the Hive database

33

* @param table Name of the Hive table

34

* @param config Hadoop Configuration with HCatalog settings

35

*/

36

public HCatInputFormatBase(String database, String table, Configuration config);

37

38

/**

39

* Specifies which fields to return and their order

40

* @param fields Array of field names to include in the output

41

* @return This instance for method chaining

42

*/

43

public HCatInputFormatBase<T> getFields(String... fields);

44

45

/**

46

* Specifies partition filter condition for partition pruning

47

* @param filter Partition filter expression (e.g., "year=2023 AND month=12")

48

* @return This instance for method chaining

49

*/

50

public HCatInputFormatBase<T> withFilter(String filter);

51

52

/**

53

* Configures the format to return Flink tuples instead of HCatRecord

54

* @return This instance for method chaining

55

*/

56

public HCatInputFormatBase<T> asFlinkTuples();

57

58

/**

59

* Returns the Hadoop Configuration used by this format

60

* @return Hadoop Configuration instance

61

*/

62

public Configuration getConfiguration();

63

64

/**

65

* Returns the HCatalog schema for the output data

66

* @return HCatSchema describing the table structure

67

*/

68

public HCatSchema getOutputSchema();

69

70

/**

71

* Returns the type information for the records produced by this format

72

* @return TypeInformation describing the output type

73

*/

74

public TypeInformation<T> getProducedType();

75

76

/**

77

* Returns the maximum tuple size supported by this format implementation

78

* Subclasses define the specific limit (e.g., 25 for standard Java API)

79

* @return Maximum number of fields supported in Flink tuples

80

*/

81

protected abstract int getMaxFlinkTupleSize();

82

83

/**

84

* Builds a Flink tuple from an HCatRecord

85

* @param t The tuple instance to populate (may be reused)

86

* @param record The HCatRecord containing the data

87

* @return Populated Flink tuple

88

*/

89

protected abstract T buildFlinkTuple(T t, HCatRecord record);

90

}

91

```

92

93

### HCatInputFormat

94

95

Concrete HCatalog InputFormat for Java API with support for up to 25 tuple fields.

96

97

```java { .api }

98

/**

99

* Concrete HCatalog InputFormat for Java API with max 25 tuple fields

100

* @param <T> The Flink tuple type (Tuple1 through Tuple25)

101

*/

102

public class HCatInputFormat<T> extends HCatInputFormatBase<T> {

103

104

/**

105

* Default constructor using default database and table from context

106

*/

107

public HCatInputFormat();

108

109

/**

110

* Creates HCatInputFormat for a specific database and table

111

* @param database Name of the Hive database

112

* @param table Name of the Hive table

113

*/

114

public HCatInputFormat(String database, String table);

115

116

/**

117

* Creates HCatInputFormat with custom Hadoop configuration

118

* @param database Name of the Hive database

119

* @param table Name of the Hive table

120

* @param config Hadoop Configuration with HCatalog settings

121

*/

122

public HCatInputFormat(String database, String table, Configuration config);

123

}

124

```

125

126

**Basic Usage Example:**

127

128

```java

129

import org.apache.flink.api.java.ExecutionEnvironment;

130

import org.apache.flink.api.java.DataSet;

131

import org.apache.flink.hcatalog.java.HCatInputFormat;

132

import org.apache.flink.api.java.tuple.Tuple3;

133

134

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

135

136

// Read from Hive table

137

HCatInputFormat<Tuple3<String, Integer, String>> hcatInput =

138

new HCatInputFormat<>("mydb", "users");

139

140

// Configure which fields to read

141

hcatInput

142

.getFields("name", "age", "city") // Select specific columns

143

.withFilter("age > 18 AND city='New York'") // Filter partitions

144

.asFlinkTuples(); // Return as Flink tuples

145

146

DataSet<Tuple3<String, Integer, String>> users = env.createInput(hcatInput);

147

users.print();

148

```

149

150

**Advanced Configuration Example:**

151

152

```java

153

import org.apache.hadoop.conf.Configuration;

154

import org.apache.flink.api.java.tuple.Tuple5;

155

156

// Configure Hadoop/Hive settings

157

Configuration hadoopConfig = new Configuration();

158

hadoopConfig.set("hive.metastore.uris", "thrift://localhost:9083");

159

hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020");

160

161

// Create input format with custom configuration

162

HCatInputFormat<Tuple5<String, Integer, String, Double, Long>> salesInput =

163

new HCatInputFormat<>("sales_db", "transactions", hadoopConfig);

164

165

// Configure for complex query

166

salesInput

167

.getFields("customer_id", "quantity", "product", "amount", "timestamp")

168

.withFilter("year=2023 AND month>=10 AND region='US'") // Partition pruning

169

.asFlinkTuples();

170

171

DataSet<Tuple5<String, Integer, String, Double, Long>> sales = env.createInput(salesInput);

172

173

// Process sales data

174

DataSet<Tuple2<String, Double>> customerTotals = sales

175

.groupBy(0) // Group by customer_id

176

.aggregate(Aggregations.SUM, 3) // Sum amounts

177

.project(0, 3); // Keep customer_id and total

178

179

customerTotals.print();

180

```

181

182

## Schema Handling

183

184

### Automatic Type Mapping

185

186

HCatalog automatically maps Hive types to Flink types:

187

188

```java

189

// Hive Schema -> Flink Types

190

// STRING -> String

191

// INT -> Integer

192

// BIGINT -> Long

193

// DOUBLE -> Double

194

// BOOLEAN -> Boolean

195

// ARRAY<T> -> List<T>

196

// MAP<K,V> -> Map<K,V>

197

// STRUCT -> Complex types (limited support)

198

```

199

200

### Working with Complex Types

201

202

```java

203

import org.apache.hive.hcatalog.data.HCatRecord;

204

import org.apache.flink.api.java.tuple.Tuple4;

205

206

// For tables with complex types, you may need custom processing

207

public class ComplexHCatInputFormat extends HCatInputFormatBase<Tuple4<String, List<String>, Map<String, Integer>, String>> {

208

209

public ComplexHCatInputFormat(String database, String table) {

210

super(database, table);

211

}

212

213

@Override

214

protected int getMaxFlinkTupleSize() {

215

return 25; // Standard Java API limit

216

}

217

218

@Override

219

protected Tuple4<String, List<String>, Map<String, Integer>, String> buildFlinkTuple(

220

Tuple4<String, List<String>, Map<String, Integer>, String> t,

221

HCatRecord record) {

222

223

// Extract primitive fields

224

String id = (String) record.get("id");

225

String status = (String) record.get("status");

226

227

// Extract complex fields

228

@SuppressWarnings("unchecked")

229

List<String> tags = (List<String>) record.get("tags");

230

231

@SuppressWarnings("unchecked")

232

Map<String, Integer> metrics = (Map<String, Integer>) record.get("metrics");

233

234

return new Tuple4<>(id, tags, metrics, status);

235

}

236

}

237

```

238

239

## Partition Management

240

241

### Partition Filtering

242

243

Efficient partition pruning reduces data processing overhead:

244

245

```java

246

// Partition filter examples

247

hcatInput.withFilter("year=2023"); // Single partition

248

hcatInput.withFilter("year=2023 AND month=12"); // Multiple partitions

249

hcatInput.withFilter("year>=2022 AND region IN ('US','EU')"); // Range and set filters

250

hcatInput.withFilter("year=2023 AND month>=6 AND month<=12"); // Range filters

251

```

252

253

### Dynamic Partition Discovery

254

255

```java

256

import org.apache.flink.api.java.tuple.Tuple6;

257

258

// Read from partitioned table with partition columns included

259

HCatInputFormat<Tuple6<String, Integer, String, String, Integer, Integer>> partitionedInput =

260

new HCatInputFormat<>("warehouse", "sales_partitioned");

261

262

partitionedInput

263

.getFields("customer", "amount", "product", "region", "year", "month") // Include partition columns

264

.withFilter("year=2023 AND region IN ('US','EU')")

265

.asFlinkTuples();

266

267

DataSet<Tuple6<String, Integer, String, String, Integer, Integer>> partitionedSales =

268

env.createInput(partitionedInput);

269

270

// Partition information is available in the data

271

partitionedSales

272

.filter(tuple -> tuple.f4 == 2023 && tuple.f5 >= 10) // Additional filtering by partition columns

273

.print();

274

```

275

276

## Performance Optimization

277

278

### Input Split Configuration

279

280

```java

281

// Configure input split size for better parallelism

282

Configuration config = new Configuration();

283

config.setLong("mapreduce.input.fileinputformat.split.maxsize", 134217728L); // 128MB splits

284

config.setInt("hive.exec.reducers.max", 200); // Maximum number of reducers

285

286

HCatInputFormat<Tuple3<String, Double, String>> optimizedInput =

287

new HCatInputFormat<>("bigdata", "large_table", config);

288

```

289

290

### Columnar Storage Integration

291

292

```java

293

// For ORC or Parquet tables, specify columns for projection pushdown

294

hcatInput

295

.getFields("id", "name", "amount") // Only read needed columns

296

.withFilter("year=2023"); // Partition pruning

297

298

// This enables:

299

// - Column projection (reduces I/O)

300

// - Partition pruning (reduces data scanned)

301

// - Predicate pushdown (when supported by storage format)

302

```

303

304

## Error Handling

305

306

```java

307

import org.apache.flink.api.common.functions.MapFunction;

308

import org.apache.flink.api.java.tuple.Tuple2;

309

310

// Handle potential null values and type conversion errors

311

DataSet<Tuple3<String, Integer, String>> safeUsers = users

312

.map(new MapFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>>() {

313

@Override

314

public Tuple3<String, Integer, String> map(Tuple3<String, Integer, String> value) {

315

// Handle null values

316

String name = value.f0 != null ? value.f0 : "Unknown";

317

Integer age = value.f1 != null ? value.f1 : 0;

318

String city = value.f2 != null ? value.f2 : "Unknown";

319

320

return new Tuple3<>(name, age, city);

321

}

322

});

323

```

324

325

## Common Types

326

327

```java { .api }

328

import org.apache.flink.api.common.io.RichInputFormat;

329

import org.apache.flink.api.common.typeinfo.TypeInformation;

330

import org.apache.flink.api.java.tuple.*;

331

import org.apache.hadoop.conf.Configuration;

332

import org.apache.hadoop.mapred.InputSplit as HadoopInputSplit;

333

import org.apache.hive.hcatalog.data.HCatRecord;

334

import org.apache.hive.hcatalog.data.schema.HCatSchema;

335

import java.util.List;

336

import java.util.Map;

337

```