or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-hcatalog_2-11

Apache Flink HCatalog connector for reading data from Apache Hive HCatalog tables

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-hcatalog_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-hcatalog_2-11@1.14.0

index.mddocs/

Flink HCatalog

Flink HCatalog is a connector library for Apache Flink that enables reading data from Apache Hive HCatalog tables. It provides both Java and Scala APIs with support for schema projection, partition filtering, and automatic type mapping between HCatalog schemas and Flink's type system.

Package Information

  • Package Name: flink-hcatalog_2.11
  • Package Type: maven
  • Language: Java/Scala
  • Installation: Add to your Maven pom.xml:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hcatalog_2.11</artifactId>
    <version>1.14.6</version>
</dependency>

Core Imports

Java:

import org.apache.flink.hcatalog.java.HCatInputFormat;
import org.apache.flink.hcatalog.HCatInputFormatBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hive.hcatalog.common.HCatException;

Scala:

import org.apache.flink.hcatalog.scala.HCatInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hive.hcatalog.data.HCatRecord

Basic Usage

Java Example:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.hcatalog.java.HCatInputFormat;
import org.apache.hive.hcatalog.data.HCatRecord;

// Create execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create HCatalog input format
HCatInputFormat<HCatRecord> hCatFormat = new HCatInputFormat<>("mydb", "mytable");

// Optional: Project specific fields
hCatFormat.getFields("name", "age", "city");

// Optional: Apply partition filter
hCatFormat.withFilter("year=2023 AND month='01'");

// Create DataSet and process
DataSet<HCatRecord> input = env.createInput(hCatFormat);

Scala Example:

import org.apache.flink.api.scala._
import org.apache.flink.hcatalog.scala.HCatInputFormat
import org.apache.hive.hcatalog.data.HCatRecord

// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// Create HCatalog input format with field projection
val hCatFormat = new HCatInputFormat[HCatRecord]("mydb", "mytable")
  .getFields("name", "age", "city")
  .withFilter("year=2023")

// Create DataSet
val input = env.createInput(hCatFormat)

Architecture

The Flink HCatalog connector is built around several key components:

  • Abstract Base Class: HCatInputFormatBase provides common functionality for both Java and Scala implementations
  • Language-Specific Implementations: Separate classes for Java (HCatInputFormat) and Scala (HCatInputFormat) that handle tuple conversion
  • Type System Integration: Automatic mapping between HCatalog data types and Flink's type system
  • Hadoop Integration: Built on top of Hadoop's MapReduce input format framework for distributed data reading
  • Schema Management: Support for dynamic schema projection and filtering at the partition level

Capabilities

Java HCatalog Input Format

Java implementation supporting Flink Tuples up to 25 fields and HCatRecord output.

/**
 * Java HCatalog input format for reading from Hive tables
 * Supports conversion to Flink Tuples (up to 25 fields) or HCatRecord objects
 */
public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
    public HCatInputFormat();
    public HCatInputFormat(String database, String table) throws Exception;
    public HCatInputFormat(String database, String table, Configuration config) throws Exception;
}

Usage Example:

// Reading as HCatRecord (default)
HCatInputFormat<HCatRecord> recordFormat = new HCatInputFormat<>("sales", "transactions");

// Reading as Flink Tuple
HCatInputFormat<Tuple3<String, Integer, Double>> tupleFormat = 
    new HCatInputFormat<Tuple3<String, Integer, Double>>("sales", "transactions")
        .getFields("product_name", "quantity", "price")
        .asFlinkTuples();

Scala HCatalog Input Format

Scala implementation supporting Scala tuples up to 22 fields and HCatRecord output.

/**
 * Scala HCatalog input format for reading from Hive tables
 * Supports conversion to Scala tuples (up to 22 fields) or HCatRecord objects
 */
class HCatInputFormat[T](database: String, table: String, config: Configuration) 
    extends HCatInputFormatBase[T](database, table, config) {
  def this(database: String, table: String) {
    this(database, table, new Configuration)
  }
}

Usage Example:

// Reading as HCatRecord (default)
val recordFormat = new HCatInputFormat[HCatRecord]("sales", "transactions")

// Reading as Scala Tuple
val tupleFormat = new HCatInputFormat[(String, Int, Double)]("sales", "transactions")
  .getFields("product_name", "quantity", "price")
  .asFlinkTuples()

Field Projection

Select and reorder specific fields from HCatalog tables to reduce data transfer and processing overhead.

/**
 * Specifies the fields which are returned by the InputFormat and their order
 * @param fields The fields and their order which are returned by the InputFormat
 * @return This InputFormat with specified return fields
 * @throws IOException if field projection fails
 */
public HCatInputFormatBase<T> getFields(String... fields) throws IOException;

Usage Examples:

// Java: Select specific fields
hCatFormat.getFields("customer_id", "order_date", "total_amount");

// Java: Reorder fields
hCatFormat.getFields("total_amount", "customer_id", "order_date");
// Scala: Select specific fields
hCatFormat.getFields("customer_id", "order_date", "total_amount")

// Scala: Reorder fields  
hCatFormat.getFields("total_amount", "customer_id", "order_date")

Partition Filtering

Apply SQL-like filter conditions on partition columns to significantly reduce the amount of data to be read.

/**
 * Specifies a SQL-like filter condition on the table's partition columns
 * Filter conditions on non-partition columns are invalid
 * @param filter A SQL-like filter condition on the table's partition columns
 * @return This InputFormat with specified partition filter
 * @throws IOException if filter application fails
 */
public HCatInputFormatBase<T> withFilter(String filter) throws IOException;

Usage Examples:

// Java: Single partition filter
hCatFormat.withFilter("year=2023");

// Java: Multiple partition conditions
hCatFormat.withFilter("year=2023 AND month='12' AND day>=15");

// Java: Range and comparison operators
hCatFormat.withFilter("year>=2020 AND region IN ('US', 'EU')");
// Scala: Single partition filter
hCatFormat.withFilter("year=2023")

// Scala: Multiple partition conditions
hCatFormat.withFilter("year=2023 AND month='12' AND day>=15")

Tuple Conversion

Convert HCatRecord output to native Flink or Scala tuples for improved type safety and performance.

/**
 * Specifies that the InputFormat returns Flink tuples instead of HCatRecord
 * Note: Flink tuples might only support a limited number of fields (depending on the API)
 * @return This InputFormat configured to return tuples
 * @throws HCatException if tuple conversion setup fails
 */
public HCatInputFormatBase<T> asFlinkTuples() throws HCatException;

Usage Examples:

// Java: Convert to Flink Tuple (up to 25 fields)
HCatInputFormat<Tuple2<String, Integer>> tupleFormat = 
    new HCatInputFormat<Tuple2<String, Integer>>("mydb", "mytable")
        .getFields("name", "age")
        .asFlinkTuples();
// Scala: Convert to Scala tuple (up to 22 fields)
val tupleFormat = new HCatInputFormat[(String, Int)]("mydb", "mytable")
  .getFields("name", "age")
  .asFlinkTuples()

Configuration Access

Access and modify the underlying Hadoop configuration for advanced customization.

/**
 * Returns the Hadoop Configuration of the HCatInputFormat
 * @return The Configuration of the HCatInputFormat
 */
public Configuration getConfiguration();

Usage Example:

// Access configuration for customization
Configuration config = hCatFormat.getConfiguration();
config.set("hive.metastore.uris", "thrift://metastore:9083");
config.setInt("mapreduce.input.fileinputformat.split.minsize", 1024000);

Schema Information

Retrieve schema information for the HCatalog table being read.

/**
 * Returns the HCatSchema of the HCatRecord returned by this InputFormat
 * @return The HCatSchema of the HCatRecords returned by this InputFormat
 */
public HCatSchema getOutputSchema();

Usage Example:

// Inspect table schema
HCatSchema schema = hCatFormat.getOutputSchema();
List<HCatFieldSchema> fields = schema.getFields();
for (HCatFieldSchema field : fields) {
    System.out.println("Field: " + field.getName() + ", Type: " + field.getType());
}

Types

/**
 * Abstract base class for HCatalog input formats
 * Provides common functionality for reading from HCatalog tables
 */
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>
        implements ResultTypeQueryable<T> {
    
    protected HCatInputFormatBase();
    protected HCatInputFormatBase(String database, String table) throws IOException;
    protected HCatInputFormatBase(String database, String table, Configuration config) throws IOException;
    
    // Abstract methods to be implemented by language-specific subclasses
    protected abstract int getMaxFlinkTupleSize();
    protected abstract T buildFlinkTuple(T t, HCatRecord record) throws HCatException;
}

/**
 * Hadoop input split wrapper for Flink integration
 */
class HadoopInputSplit implements InputSplit {
    // Implementation details handled internally
}

/**
 * HCatalog record representing a row of data from Hive table
 * Contains field values accessible by name or position
 */
interface HCatRecord {
    Object get(String fieldName, HCatSchema schema);
    Object get(int fieldPos);
    List<Object> getAll();
    // Additional methods for data access
}

/**
 * Schema definition for HCatalog table structure
 * Contains field definitions and metadata
 */
class HCatSchema {
    List<HCatFieldSchema> getFields();
    HCatFieldSchema get(String fieldName);
    HCatFieldSchema get(int position);
    int getPosition(String fieldName);
    List<String> getFieldNames();
    // Additional schema methods
}

/**
 * Individual field schema definition
 */
class HCatFieldSchema {
    String getName();
    Type getType();
    String getComment();
    // Additional field metadata methods
    
    enum Type {
        INT, TINYINT, SMALLINT, BIGINT, BOOLEAN, FLOAT, DOUBLE, 
        STRING, BINARY, ARRAY, MAP, STRUCT
    }
}

Supported Data Types

The connector provides automatic type mapping between HCatalog and Flink types:

HCatalog TypeFlink TypeJava TypeScala Type
INTBasicTypeInfo.INT_TYPE_INFOIntegerInt
TINYINTBasicTypeInfo.BYTE_TYPE_INFOByteByte
SMALLINTBasicTypeInfo.SHORT_TYPE_INFOShortShort
BIGINTBasicTypeInfo.LONG_TYPE_INFOLongLong
BOOLEANBasicTypeInfo.BOOLEAN_TYPE_INFOBooleanBoolean
FLOATBasicTypeInfo.FLOAT_TYPE_INFOFloatFloat
DOUBLEBasicTypeInfo.DOUBLE_TYPE_INFODoubleDouble
STRINGBasicTypeInfo.STRING_TYPE_INFOStringString
BINARYPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFObyte[]Array[Byte]
ARRAYGenericTypeInfo(List.class)List<Object>List[Object]
MAPGenericTypeInfo(Map.class)Map<Object, Object>Map[Object, Object]
STRUCTGenericTypeInfo(List.class)List<Object>List[Object]

Error Handling

The connector handles various error conditions:

Configuration Errors:

  • IOException: Thrown for invalid database/table names, connection issues, or configuration problems
  • HCatException: Thrown for HCatalog-specific errors during schema access or filtering

Type Conversion Errors:

  • IllegalArgumentException: Thrown when requesting more fields than supported by tuple type (25 for Java, 22 for Scala)
  • RuntimeException: Thrown for unsupported partition key types (BINARY, ARRAY, MAP, STRUCT as partition keys)

Usage Errors:

  • Invalid filter conditions on non-partition columns will cause runtime failures
  • Requesting non-existent fields will result in schema validation errors

Example Error Handling:

try {
    HCatInputFormat<HCatRecord> format = new HCatInputFormat<>("mydb", "mytable");
    format.withFilter("year=2023");
    format.getFields("name", "age", "salary");
} catch (IOException e) {
    // Handle configuration or connection errors
    logger.error("Failed to configure HCatalog input: " + e.getMessage());
} catch (HCatException e) {
    // Handle HCatalog-specific errors
    logger.error("HCatalog error: " + e.getMessage());
}