0
# HCatalog Connector
1
2
Apache Hive HCatalog metadata integration for Flink batch processing, enabling access to Hive tables with schema support, partition filtering, and automatic type mapping.
3
4
## Capabilities
5
6
### HCatInputFormatBase
7
8
Abstract base InputFormat for reading from HCatalog tables with comprehensive configuration options.
9
10
```java { .api }
11
/**
12
* Abstract base InputFormat for reading from HCatalog tables
13
* @param <T> The type of records produced (typically HCatRecord or Flink Tuple)
14
*/
15
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>
16
implements ResultTypeQueryable<T> {
17
18
/**
19
* Default constructor using default database and table from context
20
*/
21
public HCatInputFormatBase();
22
23
/**
24
* Creates HCatInputFormatBase for a specific database and table
25
* @param database Name of the Hive database
26
* @param table Name of the Hive table
27
*/
28
public HCatInputFormatBase(String database, String table);
29
30
/**
31
* Creates HCatInputFormatBase with custom Hadoop configuration
32
* @param database Name of the Hive database
33
* @param table Name of the Hive table
34
* @param config Hadoop Configuration with HCatalog settings
35
*/
36
public HCatInputFormatBase(String database, String table, Configuration config);
37
38
/**
39
* Specifies which fields to return and their order
40
* @param fields Array of field names to include in the output
41
* @return This instance for method chaining
42
*/
43
public HCatInputFormatBase<T> getFields(String... fields);
44
45
/**
46
* Specifies partition filter condition for partition pruning
47
* @param filter Partition filter expression (e.g., "year=2023 AND month=12")
48
* @return This instance for method chaining
49
*/
50
public HCatInputFormatBase<T> withFilter(String filter);
51
52
/**
53
* Configures the format to return Flink tuples instead of HCatRecord
54
* @return This instance for method chaining
55
*/
56
public HCatInputFormatBase<T> asFlinkTuples();
57
58
/**
59
* Returns the Hadoop Configuration used by this format
60
* @return Hadoop Configuration instance
61
*/
62
public Configuration getConfiguration();
63
64
/**
65
* Returns the HCatalog schema for the output data
66
* @return HCatSchema describing the table structure
67
*/
68
public HCatSchema getOutputSchema();
69
70
/**
71
* Returns the type information for the records produced by this format
72
* @return TypeInformation describing the output type
73
*/
74
public TypeInformation<T> getProducedType();
75
76
/**
77
* Returns the maximum tuple size supported by this format implementation
78
* Subclasses define the specific limit (e.g., 25 for standard Java API)
79
* @return Maximum number of fields supported in Flink tuples
80
*/
81
protected abstract int getMaxFlinkTupleSize();
82
83
/**
84
* Builds a Flink tuple from an HCatRecord
85
* @param t The tuple instance to populate (may be reused)
86
* @param record The HCatRecord containing the data
87
* @return Populated Flink tuple
88
*/
89
protected abstract T buildFlinkTuple(T t, HCatRecord record);
90
}
91
```
92
93
### HCatInputFormat
94
95
Concrete HCatalog InputFormat for Java API with support for up to 25 tuple fields.
96
97
```java { .api }
98
/**
99
* Concrete HCatalog InputFormat for Java API with max 25 tuple fields
100
* @param <T> The Flink tuple type (Tuple1 through Tuple25)
101
*/
102
public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
103
104
/**
105
* Default constructor using default database and table from context
106
*/
107
public HCatInputFormat();
108
109
/**
110
* Creates HCatInputFormat for a specific database and table
111
* @param database Name of the Hive database
112
* @param table Name of the Hive table
113
*/
114
public HCatInputFormat(String database, String table);
115
116
/**
117
* Creates HCatInputFormat with custom Hadoop configuration
118
* @param database Name of the Hive database
119
* @param table Name of the Hive table
120
* @param config Hadoop Configuration with HCatalog settings
121
*/
122
public HCatInputFormat(String database, String table, Configuration config);
123
}
124
```
125
126
**Basic Usage Example:**
127
128
```java
129
import org.apache.flink.api.java.ExecutionEnvironment;
130
import org.apache.flink.api.java.DataSet;
131
import org.apache.flink.hcatalog.java.HCatInputFormat;
132
import org.apache.flink.api.java.tuple.Tuple3;
133
134
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
135
136
// Read from Hive table
137
HCatInputFormat<Tuple3<String, Integer, String>> hcatInput =
138
new HCatInputFormat<>("mydb", "users");
139
140
// Configure which fields to read
141
hcatInput
142
.getFields("name", "age", "city") // Select specific columns
143
.withFilter("age > 18 AND city='New York'") // Filter partitions
144
.asFlinkTuples(); // Return as Flink tuples
145
146
DataSet<Tuple3<String, Integer, String>> users = env.createInput(hcatInput);
147
users.print();
148
```
149
150
**Advanced Configuration Example:**
151
152
```java
153
import org.apache.hadoop.conf.Configuration;
154
import org.apache.flink.api.java.tuple.Tuple5;
155
156
// Configure Hadoop/Hive settings
157
Configuration hadoopConfig = new Configuration();
158
hadoopConfig.set("hive.metastore.uris", "thrift://localhost:9083");
159
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020");
160
161
// Create input format with custom configuration
162
HCatInputFormat<Tuple5<String, Integer, String, Double, Long>> salesInput =
163
new HCatInputFormat<>("sales_db", "transactions", hadoopConfig);
164
165
// Configure for complex query
166
salesInput
167
.getFields("customer_id", "quantity", "product", "amount", "timestamp")
168
.withFilter("year=2023 AND month>=10 AND region='US'") // Partition pruning
169
.asFlinkTuples();
170
171
DataSet<Tuple5<String, Integer, String, Double, Long>> sales = env.createInput(salesInput);
172
173
// Process sales data
174
DataSet<Tuple2<String, Double>> customerTotals = sales
175
.groupBy(0) // Group by customer_id
176
.aggregate(Aggregations.SUM, 3) // Sum amounts
177
.project(0, 3); // Keep customer_id and total
178
179
customerTotals.print();
180
```
181
182
## Schema Handling
183
184
### Automatic Type Mapping
185
186
HCatalog automatically maps Hive types to Flink types:
187
188
```java
189
// Hive Schema -> Flink Types
190
// STRING -> String
191
// INT -> Integer
192
// BIGINT -> Long
193
// DOUBLE -> Double
194
// BOOLEAN -> Boolean
195
// ARRAY<T> -> List<T>
196
// MAP<K,V> -> Map<K,V>
197
// STRUCT -> Complex types (limited support)
198
```
199
200
### Working with Complex Types
201
202
```java
203
import org.apache.hive.hcatalog.data.HCatRecord;
204
import org.apache.flink.api.java.tuple.Tuple4;
205
206
// For tables with complex types, you may need custom processing
207
public class ComplexHCatInputFormat extends HCatInputFormatBase<Tuple4<String, List<String>, Map<String, Integer>, String>> {
208
209
public ComplexHCatInputFormat(String database, String table) {
210
super(database, table);
211
}
212
213
@Override
214
protected int getMaxFlinkTupleSize() {
215
return 25; // Standard Java API limit
216
}
217
218
@Override
219
protected Tuple4<String, List<String>, Map<String, Integer>, String> buildFlinkTuple(
220
Tuple4<String, List<String>, Map<String, Integer>, String> t,
221
HCatRecord record) {
222
223
// Extract primitive fields
224
String id = (String) record.get("id");
225
String status = (String) record.get("status");
226
227
// Extract complex fields
228
@SuppressWarnings("unchecked")
229
List<String> tags = (List<String>) record.get("tags");
230
231
@SuppressWarnings("unchecked")
232
Map<String, Integer> metrics = (Map<String, Integer>) record.get("metrics");
233
234
return new Tuple4<>(id, tags, metrics, status);
235
}
236
}
237
```
238
239
## Partition Management
240
241
### Partition Filtering
242
243
Efficient partition pruning reduces data processing overhead:
244
245
```java
246
// Partition filter examples
247
hcatInput.withFilter("year=2023"); // Single partition
248
hcatInput.withFilter("year=2023 AND month=12"); // Multiple partitions
249
hcatInput.withFilter("year>=2022 AND region IN ('US','EU')"); // Range and set filters
250
hcatInput.withFilter("year=2023 AND month>=6 AND month<=12"); // Range filters
251
```
252
253
### Dynamic Partition Discovery
254
255
```java
256
import org.apache.flink.api.java.tuple.Tuple6;
257
258
// Read from partitioned table with partition columns included
259
HCatInputFormat<Tuple6<String, Integer, String, String, Integer, Integer>> partitionedInput =
260
new HCatInputFormat<>("warehouse", "sales_partitioned");
261
262
partitionedInput
263
.getFields("customer", "amount", "product", "region", "year", "month") // Include partition columns
264
.withFilter("year=2023 AND region IN ('US','EU')")
265
.asFlinkTuples();
266
267
DataSet<Tuple6<String, Integer, String, String, Integer, Integer>> partitionedSales =
268
env.createInput(partitionedInput);
269
270
// Partition information is available in the data
271
partitionedSales
272
.filter(tuple -> tuple.f4 == 2023 && tuple.f5 >= 10) // Additional filtering by partition columns
273
.print();
274
```
275
276
## Performance Optimization
277
278
### Input Split Configuration
279
280
```java
281
// Configure input split size for better parallelism
282
Configuration config = new Configuration();
283
config.setLong("mapreduce.input.fileinputformat.split.maxsize", 134217728L); // 128MB splits
284
config.setInt("hive.exec.reducers.max", 200); // Maximum number of reducers
285
286
HCatInputFormat<Tuple3<String, Double, String>> optimizedInput =
287
new HCatInputFormat<>("bigdata", "large_table", config);
288
```
289
290
### Columnar Storage Integration
291
292
```java
293
// For ORC or Parquet tables, specify columns for projection pushdown
294
hcatInput
295
.getFields("id", "name", "amount") // Only read needed columns
296
.withFilter("year=2023"); // Partition pruning
297
298
// This enables:
299
// - Column projection (reduces I/O)
300
// - Partition pruning (reduces data scanned)
301
// - Predicate pushdown (when supported by storage format)
302
```
303
304
## Error Handling
305
306
```java
307
import org.apache.flink.api.common.functions.MapFunction;
308
import org.apache.flink.api.java.tuple.Tuple2;
309
310
// Handle potential null values and type conversion errors
311
DataSet<Tuple3<String, Integer, String>> safeUsers = users
312
.map(new MapFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>>() {
313
@Override
314
public Tuple3<String, Integer, String> map(Tuple3<String, Integer, String> value) {
315
// Handle null values
316
String name = value.f0 != null ? value.f0 : "Unknown";
317
Integer age = value.f1 != null ? value.f1 : 0;
318
String city = value.f2 != null ? value.f2 : "Unknown";
319
320
return new Tuple3<>(name, age, city);
321
}
322
});
323
```
324
325
## Common Types
326
327
```java { .api }
328
import org.apache.flink.api.common.io.RichInputFormat;
329
import org.apache.flink.api.common.typeinfo.TypeInformation;
330
import org.apache.flink.api.java.tuple.*;
331
import org.apache.hadoop.conf.Configuration;
332
import org.apache.hadoop.mapred.InputSplit as HadoopInputSplit;
333
import org.apache.hive.hcatalog.data.HCatRecord;
334
import org.apache.hive.hcatalog.data.schema.HCatSchema;
335
import java.util.List;
336
import java.util.Map;
337
```