or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Flink HCatalog

1

2

Flink HCatalog is a connector library for Apache Flink that enables reading data from Apache Hive HCatalog tables. It provides both Java and Scala APIs with support for schema projection, partition filtering, and automatic type mapping between HCatalog schemas and Flink's type system.

3

4

## Package Information

5

6

- **Package Name**: flink-hcatalog_2.11

7

- **Package Type**: maven

8

- **Language**: Java/Scala

9

- **Installation**: Add to your Maven pom.xml:

10

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-hcatalog_2.11</artifactId>

15

<version>1.14.6</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

**Java:**

22

```java

23

import org.apache.flink.hcatalog.java.HCatInputFormat;

24

import org.apache.flink.hcatalog.HCatInputFormatBase;

25

import org.apache.hadoop.conf.Configuration;

26

import org.apache.hive.hcatalog.data.HCatRecord;

27

import org.apache.hive.hcatalog.data.schema.HCatSchema;

28

import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;

29

import org.apache.hive.hcatalog.common.HCatException;

30

```

31

32

**Scala:**

33

```scala

34

import org.apache.flink.hcatalog.scala.HCatInputFormat

35

import org.apache.hadoop.conf.Configuration

36

import org.apache.hive.hcatalog.data.HCatRecord

37

```

38

39

## Basic Usage

40

41

**Java Example:**

42

```java

43

import org.apache.flink.api.java.ExecutionEnvironment;

44

import org.apache.flink.api.java.DataSet;

45

import org.apache.flink.hcatalog.java.HCatInputFormat;

46

import org.apache.hive.hcatalog.data.HCatRecord;

47

48

// Create execution environment

49

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

50

51

// Create HCatalog input format

52

HCatInputFormat<HCatRecord> hCatFormat = new HCatInputFormat<>("mydb", "mytable");

53

54

// Optional: Project specific fields

55

hCatFormat.getFields("name", "age", "city");

56

57

// Optional: Apply partition filter

58

hCatFormat.withFilter("year=2023 AND month='01'");

59

60

// Create DataSet and process

61

DataSet<HCatRecord> input = env.createInput(hCatFormat);

62

```

63

64

**Scala Example:**

65

```scala

66

import org.apache.flink.api.scala._

67

import org.apache.flink.hcatalog.scala.HCatInputFormat

68

import org.apache.hive.hcatalog.data.HCatRecord

69

70

// Create execution environment

71

val env = ExecutionEnvironment.getExecutionEnvironment

72

73

// Create HCatalog input format with field projection

74

val hCatFormat = new HCatInputFormat[HCatRecord]("mydb", "mytable")

75

.getFields("name", "age", "city")

76

.withFilter("year=2023")

77

78

// Create DataSet

79

val input = env.createInput(hCatFormat)

80

```

81

82

## Architecture

83

84

The Flink HCatalog connector is built around several key components:

85

86

- **Abstract Base Class**: `HCatInputFormatBase` provides common functionality for both Java and Scala implementations

87

- **Language-Specific Implementations**: Separate classes for Java (`HCatInputFormat`) and Scala (`HCatInputFormat`) that handle tuple conversion

88

- **Type System Integration**: Automatic mapping between HCatalog data types and Flink's type system

89

- **Hadoop Integration**: Built on top of Hadoop's MapReduce input format framework for distributed data reading

90

- **Schema Management**: Support for dynamic schema projection and filtering at the partition level

91

92

## Capabilities

93

94

### Java HCatalog Input Format

95

96

Java implementation supporting Flink Tuples up to 25 fields and HCatRecord output.

97

98

```java { .api }

99

/**

100

* Java HCatalog input format for reading from Hive tables

101

* Supports conversion to Flink Tuples (up to 25 fields) or HCatRecord objects

102

*/

103

public class HCatInputFormat<T> extends HCatInputFormatBase<T> {

104

public HCatInputFormat();

105

public HCatInputFormat(String database, String table) throws Exception;

106

public HCatInputFormat(String database, String table, Configuration config) throws Exception;

107

}

108

```

109

110

**Usage Example:**

111

```java

112

// Reading as HCatRecord (default)

113

HCatInputFormat<HCatRecord> recordFormat = new HCatInputFormat<>("sales", "transactions");

114

115

// Reading as Flink Tuple

116

HCatInputFormat<Tuple3<String, Integer, Double>> tupleFormat =

117

new HCatInputFormat<Tuple3<String, Integer, Double>>("sales", "transactions")

118

.getFields("product_name", "quantity", "price")

119

.asFlinkTuples();

120

```

121

122

### Scala HCatalog Input Format

123

124

Scala implementation supporting Scala tuples up to 22 fields and HCatRecord output.

125

126

```scala { .api }

127

/**

128

* Scala HCatalog input format for reading from Hive tables

129

* Supports conversion to Scala tuples (up to 22 fields) or HCatRecord objects

130

*/

131

class HCatInputFormat[T](database: String, table: String, config: Configuration)

132

extends HCatInputFormatBase[T](database, table, config) {

133

def this(database: String, table: String) {

134

this(database, table, new Configuration)

135

}

136

}

137

```

138

139

**Usage Example:**

140

```scala

141

// Reading as HCatRecord (default)

142

val recordFormat = new HCatInputFormat[HCatRecord]("sales", "transactions")

143

144

// Reading as Scala Tuple

145

val tupleFormat = new HCatInputFormat[(String, Int, Double)]("sales", "transactions")

146

.getFields("product_name", "quantity", "price")

147

.asFlinkTuples()

148

```

149

150

### Field Projection

151

152

Select and reorder specific fields from HCatalog tables to reduce data transfer and processing overhead.

153

154

```java { .api }

155

/**

156

* Specifies the fields which are returned by the InputFormat and their order

157

* @param fields The fields and their order which are returned by the InputFormat

158

* @return This InputFormat with specified return fields

159

* @throws IOException if field projection fails

160

*/

161

public HCatInputFormatBase<T> getFields(String... fields) throws IOException;

162

```

163

164

**Usage Examples:**

165

```java

166

// Java: Select specific fields

167

hCatFormat.getFields("customer_id", "order_date", "total_amount");

168

169

// Java: Reorder fields

170

hCatFormat.getFields("total_amount", "customer_id", "order_date");

171

```

172

173

```scala

174

// Scala: Select specific fields

175

hCatFormat.getFields("customer_id", "order_date", "total_amount")

176

177

// Scala: Reorder fields

178

hCatFormat.getFields("total_amount", "customer_id", "order_date")

179

```

180

181

### Partition Filtering

182

183

Apply SQL-like filter conditions on partition columns to significantly reduce the amount of data to be read.

184

185

```java { .api }

186

/**

187

* Specifies a SQL-like filter condition on the table's partition columns

188

* Filter conditions on non-partition columns are invalid

189

* @param filter A SQL-like filter condition on the table's partition columns

190

* @return This InputFormat with specified partition filter

191

* @throws IOException if filter application fails

192

*/

193

public HCatInputFormatBase<T> withFilter(String filter) throws IOException;

194

```

195

196

**Usage Examples:**

197

```java

198

// Java: Single partition filter

199

hCatFormat.withFilter("year=2023");

200

201

// Java: Multiple partition conditions

202

hCatFormat.withFilter("year=2023 AND month='12' AND day>=15");

203

204

// Java: Range and comparison operators

205

hCatFormat.withFilter("year>=2020 AND region IN ('US', 'EU')");

206

```

207

208

```scala

209

// Scala: Single partition filter

210

hCatFormat.withFilter("year=2023")

211

212

// Scala: Multiple partition conditions

213

hCatFormat.withFilter("year=2023 AND month='12' AND day>=15")

214

```

215

216

### Tuple Conversion

217

218

Convert HCatRecord output to native Flink or Scala tuples for improved type safety and performance.

219

220

```java { .api }

221

/**

222

* Specifies that the InputFormat returns Flink tuples instead of HCatRecord

223

* Note: Flink tuples might only support a limited number of fields (depending on the API)

224

* @return This InputFormat configured to return tuples

225

* @throws HCatException if tuple conversion setup fails

226

*/

227

public HCatInputFormatBase<T> asFlinkTuples() throws HCatException;

228

```

229

230

**Usage Examples:**

231

```java

232

// Java: Convert to Flink Tuple (up to 25 fields)

233

HCatInputFormat<Tuple2<String, Integer>> tupleFormat =

234

new HCatInputFormat<Tuple2<String, Integer>>("mydb", "mytable")

235

.getFields("name", "age")

236

.asFlinkTuples();

237

```

238

239

```scala

240

// Scala: Convert to Scala tuple (up to 22 fields)

241

val tupleFormat = new HCatInputFormat[(String, Int)]("mydb", "mytable")

242

.getFields("name", "age")

243

.asFlinkTuples()

244

```

245

246

### Configuration Access

247

248

Access and modify the underlying Hadoop configuration for advanced customization.

249

250

```java { .api }

251

/**

252

* Returns the Hadoop Configuration of the HCatInputFormat

253

* @return The Configuration of the HCatInputFormat

254

*/

255

public Configuration getConfiguration();

256

```

257

258

**Usage Example:**

259

```java

260

// Access configuration for customization

261

Configuration config = hCatFormat.getConfiguration();

262

config.set("hive.metastore.uris", "thrift://metastore:9083");

263

config.setInt("mapreduce.input.fileinputformat.split.minsize", 1024000);

264

```

265

266

### Schema Information

267

268

Retrieve schema information for the HCatalog table being read.

269

270

```java { .api }

271

/**

272

* Returns the HCatSchema of the HCatRecord returned by this InputFormat

273

* @return The HCatSchema of the HCatRecords returned by this InputFormat

274

*/

275

public HCatSchema getOutputSchema();

276

```

277

278

**Usage Example:**

279

```java

280

// Inspect table schema

281

HCatSchema schema = hCatFormat.getOutputSchema();

282

List<HCatFieldSchema> fields = schema.getFields();

283

for (HCatFieldSchema field : fields) {

284

System.out.println("Field: " + field.getName() + ", Type: " + field.getType());

285

}

286

```

287

288

## Types

289

290

```java { .api }

291

/**

292

* Abstract base class for HCatalog input formats

293

* Provides common functionality for reading from HCatalog tables

294

*/

295

public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>

296

implements ResultTypeQueryable<T> {

297

298

protected HCatInputFormatBase();

299

protected HCatInputFormatBase(String database, String table) throws IOException;

300

protected HCatInputFormatBase(String database, String table, Configuration config) throws IOException;

301

302

// Abstract methods to be implemented by language-specific subclasses

303

protected abstract int getMaxFlinkTupleSize();

304

protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;

305

}

306

307

/**

308

* Hadoop input split wrapper for Flink integration

309

*/

310

class HadoopInputSplit implements InputSplit {

311

// Implementation details handled internally

312

}

313

314

/**

315

* HCatalog record representing a row of data from Hive table

316

* Contains field values accessible by name or position

317

*/

318

interface HCatRecord {

319

Object get(String fieldName, HCatSchema schema);

320

Object get(int fieldPos);

321

List<Object> getAll();

322

// Additional methods for data access

323

}

324

325

/**

326

* Schema definition for HCatalog table structure

327

* Contains field definitions and metadata

328

*/

329

class HCatSchema {

330

List<HCatFieldSchema> getFields();

331

HCatFieldSchema get(String fieldName);

332

HCatFieldSchema get(int position);

333

int getPosition(String fieldName);

334

List<String> getFieldNames();

335

// Additional schema methods

336

}

337

338

/**

339

* Individual field schema definition

340

*/

341

class HCatFieldSchema {

342

String getName();

343

Type getType();

344

String getComment();

345

// Additional field metadata methods

346

347

enum Type {

348

INT, TINYINT, SMALLINT, BIGINT, BOOLEAN, FLOAT, DOUBLE,

349

STRING, BINARY, ARRAY, MAP, STRUCT

350

}

351

}

352

```

353

354

## Supported Data Types

355

356

The connector provides automatic type mapping between HCatalog and Flink types:

357

358

| HCatalog Type | Flink Type | Java Type | Scala Type |

359

|---------------|------------|-----------|------------|

360

| INT | BasicTypeInfo.INT_TYPE_INFO | Integer | Int |

361

| TINYINT | BasicTypeInfo.BYTE_TYPE_INFO | Byte | Byte |

362

| SMALLINT | BasicTypeInfo.SHORT_TYPE_INFO | Short | Short |

363

| BIGINT | BasicTypeInfo.LONG_TYPE_INFO | Long | Long |

364

| BOOLEAN | BasicTypeInfo.BOOLEAN_TYPE_INFO | Boolean | Boolean |

365

| FLOAT | BasicTypeInfo.FLOAT_TYPE_INFO | Float | Float |

366

| DOUBLE | BasicTypeInfo.DOUBLE_TYPE_INFO | Double | Double |

367

| STRING | BasicTypeInfo.STRING_TYPE_INFO | String | String |

368

| BINARY | PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO | byte[] | Array[Byte] |

369

| ARRAY | GenericTypeInfo(List.class) | List<Object> | List[Object] |

370

| MAP | GenericTypeInfo(Map.class) | Map<Object, Object> | Map[Object, Object] |

371

| STRUCT | GenericTypeInfo(List.class) | List<Object> | List[Object] |

372

373

## Error Handling

374

375

The connector handles various error conditions:

376

377

**Configuration Errors:**

378

- `IOException`: Thrown for invalid database/table names, connection issues, or configuration problems

379

- `HCatException`: Thrown for HCatalog-specific errors during schema access or filtering

380

381

**Type Conversion Errors:**

382

- `IllegalArgumentException`: Thrown when requesting more fields than supported by tuple type (25 for Java, 22 for Scala)

383

- `RuntimeException`: Thrown for unsupported partition key types (BINARY, ARRAY, MAP, STRUCT as partition keys)

384

385

**Usage Errors:**

386

- Invalid filter conditions on non-partition columns will cause runtime failures

387

- Requesting non-existent fields will result in schema validation errors

388

389

**Example Error Handling:**

390

```java

391

try {

392

HCatInputFormat<HCatRecord> format = new HCatInputFormat<>("mydb", "mytable");

393

format.withFilter("year=2023");

394

format.getFields("name", "age", "salary");

395

} catch (IOException e) {

396

// Handle configuration or connection errors

397

logger.error("Failed to configure HCatalog input: " + e.getMessage());

398

} catch (HCatException e) {

399

// Handle HCatalog-specific errors

400

logger.error("HCatalog error: " + e.getMessage());

401

}

402

```