Apache Flink HCatalog connector for reading data from Apache Hive HCatalog tables
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hcatalog_2-11@1.14.0Flink HCatalog is a connector library for Apache Flink that enables reading data from Apache Hive HCatalog tables. It provides both Java and Scala APIs with support for schema projection, partition filtering, and automatic type mapping between HCatalog schemas and Flink's type system.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hcatalog_2.11</artifactId>
<version>1.14.6</version>
</dependency>Java:
import org.apache.flink.hcatalog.java.HCatInputFormat;
import org.apache.flink.hcatalog.HCatInputFormatBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hive.hcatalog.common.HCatException;Scala:
import org.apache.flink.hcatalog.scala.HCatInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hive.hcatalog.data.HCatRecordJava Example:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.hcatalog.java.HCatInputFormat;
import org.apache.hive.hcatalog.data.HCatRecord;
// Create execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create HCatalog input format
HCatInputFormat<HCatRecord> hCatFormat = new HCatInputFormat<>("mydb", "mytable");
// Optional: Project specific fields
hCatFormat.getFields("name", "age", "city");
// Optional: Apply partition filter
hCatFormat.withFilter("year=2023 AND month='01'");
// Create DataSet and process
DataSet<HCatRecord> input = env.createInput(hCatFormat);Scala Example:
import org.apache.flink.api.scala._
import org.apache.flink.hcatalog.scala.HCatInputFormat
import org.apache.hive.hcatalog.data.HCatRecord
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Create HCatalog input format with field projection
val hCatFormat = new HCatInputFormat[HCatRecord]("mydb", "mytable")
.getFields("name", "age", "city")
.withFilter("year=2023")
// Create DataSet
val input = env.createInput(hCatFormat)The Flink HCatalog connector is built around several key components:
HCatInputFormatBase provides common functionality for both Java and Scala implementationsHCatInputFormat) and Scala (HCatInputFormat) that handle tuple conversionJava implementation supporting Flink Tuples up to 25 fields and HCatRecord output.
/**
* Java HCatalog input format for reading from Hive tables
* Supports conversion to Flink Tuples (up to 25 fields) or HCatRecord objects
*/
public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
public HCatInputFormat();
public HCatInputFormat(String database, String table) throws Exception;
public HCatInputFormat(String database, String table, Configuration config) throws Exception;
}Usage Example:
// Reading as HCatRecord (default)
HCatInputFormat<HCatRecord> recordFormat = new HCatInputFormat<>("sales", "transactions");
// Reading as Flink Tuple
HCatInputFormat<Tuple3<String, Integer, Double>> tupleFormat =
new HCatInputFormat<Tuple3<String, Integer, Double>>("sales", "transactions")
.getFields("product_name", "quantity", "price")
.asFlinkTuples();Scala implementation supporting Scala tuples up to 22 fields and HCatRecord output.
/**
* Scala HCatalog input format for reading from Hive tables
* Supports conversion to Scala tuples (up to 22 fields) or HCatRecord objects
*/
class HCatInputFormat[T](database: String, table: String, config: Configuration)
extends HCatInputFormatBase[T](database, table, config) {
def this(database: String, table: String) {
this(database, table, new Configuration)
}
}Usage Example:
// Reading as HCatRecord (default)
val recordFormat = new HCatInputFormat[HCatRecord]("sales", "transactions")
// Reading as Scala Tuple
val tupleFormat = new HCatInputFormat[(String, Int, Double)]("sales", "transactions")
.getFields("product_name", "quantity", "price")
.asFlinkTuples()Select and reorder specific fields from HCatalog tables to reduce data transfer and processing overhead.
/**
* Specifies the fields which are returned by the InputFormat and their order
* @param fields The fields and their order which are returned by the InputFormat
* @return This InputFormat with specified return fields
* @throws IOException if field projection fails
*/
public HCatInputFormatBase<T> getFields(String... fields) throws IOException;Usage Examples:
// Java: Select specific fields
hCatFormat.getFields("customer_id", "order_date", "total_amount");
// Java: Reorder fields
hCatFormat.getFields("total_amount", "customer_id", "order_date");// Scala: Select specific fields
hCatFormat.getFields("customer_id", "order_date", "total_amount")
// Scala: Reorder fields
hCatFormat.getFields("total_amount", "customer_id", "order_date")Apply SQL-like filter conditions on partition columns to significantly reduce the amount of data to be read.
/**
* Specifies a SQL-like filter condition on the table's partition columns
* Filter conditions on non-partition columns are invalid
* @param filter A SQL-like filter condition on the table's partition columns
* @return This InputFormat with specified partition filter
* @throws IOException if filter application fails
*/
public HCatInputFormatBase<T> withFilter(String filter) throws IOException;Usage Examples:
// Java: Single partition filter
hCatFormat.withFilter("year=2023");
// Java: Multiple partition conditions
hCatFormat.withFilter("year=2023 AND month='12' AND day>=15");
// Java: Range and comparison operators
hCatFormat.withFilter("year>=2020 AND region IN ('US', 'EU')");// Scala: Single partition filter
hCatFormat.withFilter("year=2023")
// Scala: Multiple partition conditions
hCatFormat.withFilter("year=2023 AND month='12' AND day>=15")Convert HCatRecord output to native Flink or Scala tuples for improved type safety and performance.
/**
* Specifies that the InputFormat returns Flink tuples instead of HCatRecord
* Note: Flink tuples might only support a limited number of fields (depending on the API)
* @return This InputFormat configured to return tuples
* @throws HCatException if tuple conversion setup fails
*/
public HCatInputFormatBase<T> asFlinkTuples() throws HCatException;Usage Examples:
// Java: Convert to Flink Tuple (up to 25 fields)
HCatInputFormat<Tuple2<String, Integer>> tupleFormat =
new HCatInputFormat<Tuple2<String, Integer>>("mydb", "mytable")
.getFields("name", "age")
.asFlinkTuples();// Scala: Convert to Scala tuple (up to 22 fields)
val tupleFormat = new HCatInputFormat[(String, Int)]("mydb", "mytable")
.getFields("name", "age")
.asFlinkTuples()Access and modify the underlying Hadoop configuration for advanced customization.
/**
* Returns the Hadoop Configuration of the HCatInputFormat
* @return The Configuration of the HCatInputFormat
*/
public Configuration getConfiguration();Usage Example:
// Access configuration for customization
Configuration config = hCatFormat.getConfiguration();
config.set("hive.metastore.uris", "thrift://metastore:9083");
config.setInt("mapreduce.input.fileinputformat.split.minsize", 1024000);Retrieve schema information for the HCatalog table being read.
/**
* Returns the HCatSchema of the HCatRecord returned by this InputFormat
* @return The HCatSchema of the HCatRecords returned by this InputFormat
*/
public HCatSchema getOutputSchema();Usage Example:
// Inspect table schema
HCatSchema schema = hCatFormat.getOutputSchema();
List<HCatFieldSchema> fields = schema.getFields();
for (HCatFieldSchema field : fields) {
System.out.println("Field: " + field.getName() + ", Type: " + field.getType());
}/**
* Abstract base class for HCatalog input formats
* Provides common functionality for reading from HCatalog tables
*/
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>
implements ResultTypeQueryable<T> {
protected HCatInputFormatBase();
protected HCatInputFormatBase(String database, String table) throws IOException;
protected HCatInputFormatBase(String database, String table, Configuration config) throws IOException;
// Abstract methods to be implemented by language-specific subclasses
protected abstract int getMaxFlinkTupleSize();
protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
}
/**
* Hadoop input split wrapper for Flink integration
*/
class HadoopInputSplit implements InputSplit {
// Implementation details handled internally
}
/**
* HCatalog record representing a row of data from Hive table
* Contains field values accessible by name or position
*/
interface HCatRecord {
Object get(String fieldName, HCatSchema schema);
Object get(int fieldPos);
List<Object> getAll();
// Additional methods for data access
}
/**
* Schema definition for HCatalog table structure
* Contains field definitions and metadata
*/
class HCatSchema {
List<HCatFieldSchema> getFields();
HCatFieldSchema get(String fieldName);
HCatFieldSchema get(int position);
int getPosition(String fieldName);
List<String> getFieldNames();
// Additional schema methods
}
/**
* Individual field schema definition
*/
class HCatFieldSchema {
String getName();
Type getType();
String getComment();
// Additional field metadata methods
enum Type {
INT, TINYINT, SMALLINT, BIGINT, BOOLEAN, FLOAT, DOUBLE,
STRING, BINARY, ARRAY, MAP, STRUCT
}
}The connector provides automatic type mapping between HCatalog and Flink types:
| HCatalog Type | Flink Type | Java Type | Scala Type |
|---|---|---|---|
| INT | BasicTypeInfo.INT_TYPE_INFO | Integer | Int |
| TINYINT | BasicTypeInfo.BYTE_TYPE_INFO | Byte | Byte |
| SMALLINT | BasicTypeInfo.SHORT_TYPE_INFO | Short | Short |
| BIGINT | BasicTypeInfo.LONG_TYPE_INFO | Long | Long |
| BOOLEAN | BasicTypeInfo.BOOLEAN_TYPE_INFO | Boolean | Boolean |
| FLOAT | BasicTypeInfo.FLOAT_TYPE_INFO | Float | Float |
| DOUBLE | BasicTypeInfo.DOUBLE_TYPE_INFO | Double | Double |
| STRING | BasicTypeInfo.STRING_TYPE_INFO | String | String |
| BINARY | PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO | byte[] | Array[Byte] |
| ARRAY | GenericTypeInfo(List.class) | List<Object> | List[Object] |
| MAP | GenericTypeInfo(Map.class) | Map<Object, Object> | Map[Object, Object] |
| STRUCT | GenericTypeInfo(List.class) | List<Object> | List[Object] |
The connector handles various error conditions:
Configuration Errors:
IOException: Thrown for invalid database/table names, connection issues, or configuration problemsHCatException: Thrown for HCatalog-specific errors during schema access or filteringType Conversion Errors:
IllegalArgumentException: Thrown when requesting more fields than supported by tuple type (25 for Java, 22 for Scala)RuntimeException: Thrown for unsupported partition key types (BINARY, ARRAY, MAP, STRUCT as partition keys)Usage Errors:
Example Error Handling:
try {
HCatInputFormat<HCatRecord> format = new HCatInputFormat<>("mydb", "mytable");
format.withFilter("year=2023");
format.getFields("name", "age", "salary");
} catch (IOException e) {
// Handle configuration or connection errors
logger.error("Failed to configure HCatalog input: " + e.getMessage());
} catch (HCatException e) {
// Handle HCatalog-specific errors
logger.error("HCatalog error: " + e.getMessage());
}