Apache Flink HCatalog connector for reading data from Apache Hive HCatalog tables
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hcatalog_2-11@1.14.00
# Flink HCatalog
1
2
Flink HCatalog is a connector library for Apache Flink that enables reading data from Apache Hive HCatalog tables. It provides both Java and Scala APIs with support for schema projection, partition filtering, and automatic type mapping between HCatalog schemas and Flink's type system.
3
4
## Package Information
5
6
- **Package Name**: flink-hcatalog_2.11
7
- **Package Type**: maven
8
- **Language**: Java/Scala
9
- **Installation**: Add to your Maven pom.xml:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-hcatalog_2.11</artifactId>
15
<version>1.14.6</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
**Java:**
22
```java
23
import org.apache.flink.hcatalog.java.HCatInputFormat;
24
import org.apache.flink.hcatalog.HCatInputFormatBase;
25
import org.apache.hadoop.conf.Configuration;
26
import org.apache.hive.hcatalog.data.HCatRecord;
27
import org.apache.hive.hcatalog.data.schema.HCatSchema;
28
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
29
import org.apache.hive.hcatalog.common.HCatException;
30
```
31
32
**Scala:**
33
```scala
34
import org.apache.flink.hcatalog.scala.HCatInputFormat
35
import org.apache.hadoop.conf.Configuration
36
import org.apache.hive.hcatalog.data.HCatRecord
37
```
38
39
## Basic Usage
40
41
**Java Example:**
42
```java
43
import org.apache.flink.api.java.ExecutionEnvironment;
44
import org.apache.flink.api.java.DataSet;
45
import org.apache.flink.hcatalog.java.HCatInputFormat;
46
import org.apache.hive.hcatalog.data.HCatRecord;
47
48
// Create execution environment
49
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
50
51
// Create HCatalog input format
52
HCatInputFormat<HCatRecord> hCatFormat = new HCatInputFormat<>("mydb", "mytable");
53
54
// Optional: Project specific fields
55
hCatFormat.getFields("name", "age", "city");
56
57
// Optional: Apply partition filter
58
hCatFormat.withFilter("year=2023 AND month='01'");
59
60
// Create DataSet and process
61
DataSet<HCatRecord> input = env.createInput(hCatFormat);
62
```
63
64
**Scala Example:**
65
```scala
66
import org.apache.flink.api.scala._
67
import org.apache.flink.hcatalog.scala.HCatInputFormat
68
import org.apache.hive.hcatalog.data.HCatRecord
69
70
// Create execution environment
71
val env = ExecutionEnvironment.getExecutionEnvironment
72
73
// Create HCatalog input format with field projection
74
val hCatFormat = new HCatInputFormat[HCatRecord]("mydb", "mytable")
75
.getFields("name", "age", "city")
76
.withFilter("year=2023")
77
78
// Create DataSet
79
val input = env.createInput(hCatFormat)
80
```
81
82
## Architecture
83
84
The Flink HCatalog connector is built around several key components:
85
86
- **Abstract Base Class**: `HCatInputFormatBase` provides common functionality for both Java and Scala implementations
87
- **Language-Specific Implementations**: Separate classes for Java (`HCatInputFormat`) and Scala (`HCatInputFormat`) that handle tuple conversion
88
- **Type System Integration**: Automatic mapping between HCatalog data types and Flink's type system
89
- **Hadoop Integration**: Built on top of Hadoop's MapReduce input format framework for distributed data reading
90
- **Schema Management**: Support for dynamic schema projection and filtering at the partition level
91
92
## Capabilities
93
94
### Java HCatalog Input Format
95
96
Java implementation supporting Flink Tuples up to 25 fields and HCatRecord output.
97
98
```java { .api }
99
/**
100
* Java HCatalog input format for reading from Hive tables
101
* Supports conversion to Flink Tuples (up to 25 fields) or HCatRecord objects
102
*/
103
public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
104
public HCatInputFormat();
105
public HCatInputFormat(String database, String table) throws Exception;
106
public HCatInputFormat(String database, String table, Configuration config) throws Exception;
107
}
108
```
109
110
**Usage Example:**
111
```java
112
// Reading as HCatRecord (default)
113
HCatInputFormat<HCatRecord> recordFormat = new HCatInputFormat<>("sales", "transactions");
114
115
// Reading as Flink Tuple
116
HCatInputFormat<Tuple3<String, Integer, Double>> tupleFormat =
117
new HCatInputFormat<Tuple3<String, Integer, Double>>("sales", "transactions")
118
.getFields("product_name", "quantity", "price")
119
.asFlinkTuples();
120
```
121
122
### Scala HCatalog Input Format
123
124
Scala implementation supporting Scala tuples up to 22 fields and HCatRecord output.
125
126
```scala { .api }
127
/**
128
* Scala HCatalog input format for reading from Hive tables
129
* Supports conversion to Scala tuples (up to 22 fields) or HCatRecord objects
130
*/
131
class HCatInputFormat[T](database: String, table: String, config: Configuration)
132
extends HCatInputFormatBase[T](database, table, config) {
133
def this(database: String, table: String) {
134
this(database, table, new Configuration)
135
}
136
}
137
```
138
139
**Usage Example:**
140
```scala
141
// Reading as HCatRecord (default)
142
val recordFormat = new HCatInputFormat[HCatRecord]("sales", "transactions")
143
144
// Reading as Scala Tuple
145
val tupleFormat = new HCatInputFormat[(String, Int, Double)]("sales", "transactions")
146
.getFields("product_name", "quantity", "price")
147
.asFlinkTuples()
148
```
149
150
### Field Projection
151
152
Select and reorder specific fields from HCatalog tables to reduce data transfer and processing overhead.
153
154
```java { .api }
155
/**
156
* Specifies the fields which are returned by the InputFormat and their order
157
* @param fields The fields and their order which are returned by the InputFormat
158
* @return This InputFormat with specified return fields
159
* @throws IOException if field projection fails
160
*/
161
public HCatInputFormatBase<T> getFields(String... fields) throws IOException;
162
```
163
164
**Usage Examples:**
165
```java
166
// Java: Select specific fields
167
hCatFormat.getFields("customer_id", "order_date", "total_amount");
168
169
// Java: Reorder fields
170
hCatFormat.getFields("total_amount", "customer_id", "order_date");
171
```
172
173
```scala
174
// Scala: Select specific fields
175
hCatFormat.getFields("customer_id", "order_date", "total_amount")
176
177
// Scala: Reorder fields
178
hCatFormat.getFields("total_amount", "customer_id", "order_date")
179
```
180
181
### Partition Filtering
182
183
Apply SQL-like filter conditions on partition columns to significantly reduce the amount of data to be read.
184
185
```java { .api }
186
/**
187
* Specifies a SQL-like filter condition on the table's partition columns
188
* Filter conditions on non-partition columns are invalid
189
* @param filter A SQL-like filter condition on the table's partition columns
190
* @return This InputFormat with specified partition filter
191
* @throws IOException if filter application fails
192
*/
193
public HCatInputFormatBase<T> withFilter(String filter) throws IOException;
194
```
195
196
**Usage Examples:**
197
```java
198
// Java: Single partition filter
199
hCatFormat.withFilter("year=2023");
200
201
// Java: Multiple partition conditions
202
hCatFormat.withFilter("year=2023 AND month='12' AND day>=15");
203
204
// Java: Range and comparison operators
205
hCatFormat.withFilter("year>=2020 AND region IN ('US', 'EU')");
206
```
207
208
```scala
209
// Scala: Single partition filter
210
hCatFormat.withFilter("year=2023")
211
212
// Scala: Multiple partition conditions
213
hCatFormat.withFilter("year=2023 AND month='12' AND day>=15")
214
```
215
216
### Tuple Conversion
217
218
Convert HCatRecord output to native Flink or Scala tuples for improved type safety and performance.
219
220
```java { .api }
221
/**
222
* Specifies that the InputFormat returns Flink tuples instead of HCatRecord
223
* Note: Flink tuples might only support a limited number of fields (depending on the API)
224
* @return This InputFormat configured to return tuples
225
* @throws HCatException if tuple conversion setup fails
226
*/
227
public HCatInputFormatBase<T> asFlinkTuples() throws HCatException;
228
```
229
230
**Usage Examples:**
231
```java
232
// Java: Convert to Flink Tuple (up to 25 fields)
233
HCatInputFormat<Tuple2<String, Integer>> tupleFormat =
234
new HCatInputFormat<Tuple2<String, Integer>>("mydb", "mytable")
235
.getFields("name", "age")
236
.asFlinkTuples();
237
```
238
239
```scala
240
// Scala: Convert to Scala tuple (up to 22 fields)
241
val tupleFormat = new HCatInputFormat[(String, Int)]("mydb", "mytable")
242
.getFields("name", "age")
243
.asFlinkTuples()
244
```
245
246
### Configuration Access
247
248
Access and modify the underlying Hadoop configuration for advanced customization.
249
250
```java { .api }
251
/**
252
* Returns the Hadoop Configuration of the HCatInputFormat
253
* @return The Configuration of the HCatInputFormat
254
*/
255
public Configuration getConfiguration();
256
```
257
258
**Usage Example:**
259
```java
260
// Access configuration for customization
261
Configuration config = hCatFormat.getConfiguration();
262
config.set("hive.metastore.uris", "thrift://metastore:9083");
263
config.setInt("mapreduce.input.fileinputformat.split.minsize", 1024000);
264
```
265
266
### Schema Information
267
268
Retrieve schema information for the HCatalog table being read.
269
270
```java { .api }
271
/**
272
* Returns the HCatSchema of the HCatRecord returned by this InputFormat
273
* @return The HCatSchema of the HCatRecords returned by this InputFormat
274
*/
275
public HCatSchema getOutputSchema();
276
```
277
278
**Usage Example:**
279
```java
280
// Inspect table schema
281
HCatSchema schema = hCatFormat.getOutputSchema();
282
List<HCatFieldSchema> fields = schema.getFields();
283
for (HCatFieldSchema field : fields) {
284
System.out.println("Field: " + field.getName() + ", Type: " + field.getType());
285
}
286
```
287
288
## Types
289
290
```java { .api }
291
/**
292
* Abstract base class for HCatalog input formats
293
* Provides common functionality for reading from HCatalog tables
294
*/
295
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>
296
implements ResultTypeQueryable<T> {
297
298
protected HCatInputFormatBase();
299
protected HCatInputFormatBase(String database, String table) throws IOException;
300
protected HCatInputFormatBase(String database, String table, Configuration config) throws IOException;
301
302
// Abstract methods to be implemented by language-specific subclasses
303
protected abstract int getMaxFlinkTupleSize();
304
protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
305
}
306
307
/**
308
* Hadoop input split wrapper for Flink integration
309
*/
310
class HadoopInputSplit implements InputSplit {
311
// Implementation details handled internally
312
}
313
314
/**
315
* HCatalog record representing a row of data from Hive table
316
* Contains field values accessible by name or position
317
*/
318
interface HCatRecord {
319
Object get(String fieldName, HCatSchema schema);
320
Object get(int fieldPos);
321
List<Object> getAll();
322
// Additional methods for data access
323
}
324
325
/**
326
* Schema definition for HCatalog table structure
327
* Contains field definitions and metadata
328
*/
329
class HCatSchema {
330
List<HCatFieldSchema> getFields();
331
HCatFieldSchema get(String fieldName);
332
HCatFieldSchema get(int position);
333
int getPosition(String fieldName);
334
List<String> getFieldNames();
335
// Additional schema methods
336
}
337
338
/**
339
* Individual field schema definition
340
*/
341
class HCatFieldSchema {
342
String getName();
343
Type getType();
344
String getComment();
345
// Additional field metadata methods
346
347
enum Type {
348
INT, TINYINT, SMALLINT, BIGINT, BOOLEAN, FLOAT, DOUBLE,
349
STRING, BINARY, ARRAY, MAP, STRUCT
350
}
351
}
352
```
353
354
## Supported Data Types
355
356
The connector provides automatic type mapping between HCatalog and Flink types:
357
358
| HCatalog Type | Flink Type | Java Type | Scala Type |
359
|---------------|------------|-----------|------------|
360
| INT | BasicTypeInfo.INT_TYPE_INFO | Integer | Int |
361
| TINYINT | BasicTypeInfo.BYTE_TYPE_INFO | Byte | Byte |
362
| SMALLINT | BasicTypeInfo.SHORT_TYPE_INFO | Short | Short |
363
| BIGINT | BasicTypeInfo.LONG_TYPE_INFO | Long | Long |
364
| BOOLEAN | BasicTypeInfo.BOOLEAN_TYPE_INFO | Boolean | Boolean |
365
| FLOAT | BasicTypeInfo.FLOAT_TYPE_INFO | Float | Float |
366
| DOUBLE | BasicTypeInfo.DOUBLE_TYPE_INFO | Double | Double |
367
| STRING | BasicTypeInfo.STRING_TYPE_INFO | String | String |
368
| BINARY | PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO | byte[] | Array[Byte] |
369
| ARRAY | GenericTypeInfo(List.class) | List<Object> | List[Object] |
370
| MAP | GenericTypeInfo(Map.class) | Map<Object, Object> | Map[Object, Object] |
371
| STRUCT | GenericTypeInfo(List.class) | List<Object> | List[Object] |
372
373
## Error Handling
374
375
The connector handles various error conditions:
376
377
**Configuration Errors:**
378
- `IOException`: Thrown for invalid database/table names, connection issues, or configuration problems
379
- `HCatException`: Thrown for HCatalog-specific errors during schema access or filtering
380
381
**Type Conversion Errors:**
382
- `IllegalArgumentException`: Thrown when requesting more fields than supported by tuple type (25 for Java, 22 for Scala)
383
- `RuntimeException`: Thrown for unsupported partition key types (BINARY, ARRAY, MAP, STRUCT as partition keys)
384
385
**Usage Errors:**
386
- Invalid filter conditions on non-partition columns will cause runtime failures
387
- Requesting non-existent fields will result in schema validation errors
388
389
**Example Error Handling:**
390
```java
391
try {
392
HCatInputFormat<HCatRecord> format = new HCatInputFormat<>("mydb", "mytable");
393
format.withFilter("year=2023");
394
format.getFields("name", "age", "salary");
395
} catch (IOException e) {
396
// Handle configuration or connection errors
397
logger.error("Failed to configure HCatalog input: " + e.getMessage());
398
} catch (HCatException e) {
399
// Handle HCatalog-specific errors
400
logger.error("HCatalog error: " + e.getMessage());
401
}
402
```