CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-batch-connectors

Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources

Pending
Overview
Eval results
Files

hcatalog.mddocs/

HCatalog Connector

Apache Hive HCatalog metadata integration for Flink batch processing, enabling access to Hive tables with schema support, partition filtering, and automatic type mapping.

Capabilities

HCatInputFormatBase

Abstract base InputFormat for reading from HCatalog tables with comprehensive configuration options.

/**
 * Abstract base InputFormat for reading from HCatalog tables
 * @param <T> The type of records produced (typically HCatRecord or Flink Tuple)
 */
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>
        implements ResultTypeQueryable<T> {
    
    /**
     * Default constructor using default database and table from context
     */
    public HCatInputFormatBase();
    
    /**
     * Creates HCatInputFormatBase for a specific database and table
     * @param database Name of the Hive database
     * @param table Name of the Hive table
     */
    public HCatInputFormatBase(String database, String table);
    
    /**
     * Creates HCatInputFormatBase with custom Hadoop configuration
     * @param database Name of the Hive database
     * @param table Name of the Hive table  
     * @param config Hadoop Configuration with HCatalog settings
     */
    public HCatInputFormatBase(String database, String table, Configuration config);
    
    /**
     * Specifies which fields to return and their order
     * @param fields Array of field names to include in the output
     * @return This instance for method chaining
     */
    public HCatInputFormatBase<T> getFields(String... fields);
    
    /**
     * Specifies partition filter condition for partition pruning
     * @param filter Partition filter expression (e.g., "year=2023 AND month=12")
     * @return This instance for method chaining
     */
    public HCatInputFormatBase<T> withFilter(String filter);
    
    /**
     * Configures the format to return Flink tuples instead of HCatRecord
     * @return This instance for method chaining
     */
    public HCatInputFormatBase<T> asFlinkTuples();
    
    /**
     * Returns the Hadoop Configuration used by this format
     * @return Hadoop Configuration instance
     */
    public Configuration getConfiguration();
    
    /**
     * Returns the HCatalog schema for the output data
     * @return HCatSchema describing the table structure
     */
    public HCatSchema getOutputSchema();
    
    /**
     * Returns the type information for the records produced by this format
     * @return TypeInformation describing the output type
     */
    public TypeInformation<T> getProducedType();
    
    /**
     * Returns the maximum tuple size supported by this format implementation
     * Subclasses define the specific limit (e.g., 25 for standard Java API)
     * @return Maximum number of fields supported in Flink tuples
     */
    protected abstract int getMaxFlinkTupleSize();
    
    /**
     * Builds a Flink tuple from an HCatRecord
     * @param t The tuple instance to populate (may be reused)
     * @param record The HCatRecord containing the data
     * @return Populated Flink tuple
     */
    protected abstract T buildFlinkTuple(T t, HCatRecord record);
}

HCatInputFormat

Concrete HCatalog InputFormat for Java API with support for up to 25 tuple fields.

/**
 * Concrete HCatalog InputFormat for Java API with max 25 tuple fields
 * @param <T> The Flink tuple type (Tuple1 through Tuple25)
 */
public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
    
    /**
     * Default constructor using default database and table from context
     */
    public HCatInputFormat();
    
    /**
     * Creates HCatInputFormat for a specific database and table
     * @param database Name of the Hive database
     * @param table Name of the Hive table
     */
    public HCatInputFormat(String database, String table);
    
    /**
     * Creates HCatInputFormat with custom Hadoop configuration
     * @param database Name of the Hive database
     * @param table Name of the Hive table
     * @param config Hadoop Configuration with HCatalog settings
     */
    public HCatInputFormat(String database, String table, Configuration config);
}

Basic Usage Example:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.hcatalog.java.HCatInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Read from Hive table
HCatInputFormat<Tuple3<String, Integer, String>> hcatInput = 
    new HCatInputFormat<>("mydb", "users");

// Configure which fields to read
hcatInput
    .getFields("name", "age", "city")          // Select specific columns
    .withFilter("age > 18 AND city='New York'") // Filter partitions
    .asFlinkTuples();                          // Return as Flink tuples

DataSet<Tuple3<String, Integer, String>> users = env.createInput(hcatInput);
users.print();

Advanced Configuration Example:

import org.apache.hadoop.conf.Configuration;
import org.apache.flink.api.java.tuple.Tuple5;

// Configure Hadoop/Hive settings
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("hive.metastore.uris", "thrift://localhost:9083");
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020");

// Create input format with custom configuration
HCatInputFormat<Tuple5<String, Integer, String, Double, Long>> salesInput = 
    new HCatInputFormat<>("sales_db", "transactions", hadoopConfig);

// Configure for complex query
salesInput
    .getFields("customer_id", "quantity", "product", "amount", "timestamp")
    .withFilter("year=2023 AND month>=10 AND region='US'")  // Partition pruning
    .asFlinkTuples();

DataSet<Tuple5<String, Integer, String, Double, Long>> sales = env.createInput(salesInput);

// Process sales data
DataSet<Tuple2<String, Double>> customerTotals = sales
    .groupBy(0)  // Group by customer_id
    .aggregate(Aggregations.SUM, 3)  // Sum amounts
    .project(0, 3);  // Keep customer_id and total

customerTotals.print();

Schema Handling

Automatic Type Mapping

HCatalog automatically maps Hive types to Flink types:

// Hive Schema -> Flink Types
// STRING -> String
// INT -> Integer  
// BIGINT -> Long
// DOUBLE -> Double
// BOOLEAN -> Boolean
// ARRAY<T> -> List<T>
// MAP<K,V> -> Map<K,V>
// STRUCT -> Complex types (limited support)

Working with Complex Types

import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.flink.api.java.tuple.Tuple4;

// For tables with complex types, you may need custom processing
public class ComplexHCatInputFormat extends HCatInputFormatBase<Tuple4<String, List<String>, Map<String, Integer>, String>> {
    
    public ComplexHCatInputFormat(String database, String table) {
        super(database, table);
    }
    
    @Override
    protected int getMaxFlinkTupleSize() {
        return 25;  // Standard Java API limit
    }
    
    @Override
    protected Tuple4<String, List<String>, Map<String, Integer>, String> buildFlinkTuple(
            Tuple4<String, List<String>, Map<String, Integer>, String> t, 
            HCatRecord record) {
        
        // Extract primitive fields
        String id = (String) record.get("id");
        String status = (String) record.get("status");
        
        // Extract complex fields
        @SuppressWarnings("unchecked")
        List<String> tags = (List<String>) record.get("tags");
        
        @SuppressWarnings("unchecked") 
        Map<String, Integer> metrics = (Map<String, Integer>) record.get("metrics");
        
        return new Tuple4<>(id, tags, metrics, status);
    }
}

Partition Management

Partition Filtering

Efficient partition pruning reduces data processing overhead:

// Partition filter examples
hcatInput.withFilter("year=2023");                           // Single partition
hcatInput.withFilter("year=2023 AND month=12");              // Multiple partitions
hcatInput.withFilter("year>=2022 AND region IN ('US','EU')"); // Range and set filters
hcatInput.withFilter("year=2023 AND month>=6 AND month<=12"); // Range filters

Dynamic Partition Discovery

import org.apache.flink.api.java.tuple.Tuple6;

// Read from partitioned table with partition columns included
HCatInputFormat<Tuple6<String, Integer, String, String, Integer, Integer>> partitionedInput = 
    new HCatInputFormat<>("warehouse", "sales_partitioned");

partitionedInput
    .getFields("customer", "amount", "product", "region", "year", "month")  // Include partition columns
    .withFilter("year=2023 AND region IN ('US','EU')")
    .asFlinkTuples();

DataSet<Tuple6<String, Integer, String, String, Integer, Integer>> partitionedSales = 
    env.createInput(partitionedInput);

// Partition information is available in the data
partitionedSales
    .filter(tuple -> tuple.f4 == 2023 && tuple.f5 >= 10)  // Additional filtering by partition columns
    .print();

Performance Optimization

Input Split Configuration

// Configure input split size for better parallelism
Configuration config = new Configuration();
config.setLong("mapreduce.input.fileinputformat.split.maxsize", 134217728L); // 128MB splits
config.setInt("hive.exec.reducers.max", 200);  // Maximum number of reducers

HCatInputFormat<Tuple3<String, Double, String>> optimizedInput = 
    new HCatInputFormat<>("bigdata", "large_table", config);

Columnar Storage Integration

// For ORC or Parquet tables, specify columns for projection pushdown
hcatInput
    .getFields("id", "name", "amount")  // Only read needed columns
    .withFilter("year=2023");           // Partition pruning

// This enables:
// - Column projection (reduces I/O)
// - Partition pruning (reduces data scanned)
// - Predicate pushdown (when supported by storage format)

Error Handling

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

// Handle potential null values and type conversion errors
DataSet<Tuple3<String, Integer, String>> safeUsers = users
    .map(new MapFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>>() {
        @Override
        public Tuple3<String, Integer, String> map(Tuple3<String, Integer, String> value) {
            // Handle null values
            String name = value.f0 != null ? value.f0 : "Unknown";
            Integer age = value.f1 != null ? value.f1 : 0;
            String city = value.f2 != null ? value.f2 : "Unknown";
            
            return new Tuple3<>(name, age, city);
        }
    });

Common Types

import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit as HadoopInputSplit;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import java.util.List;
import java.util.Map;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-batch-connectors

docs

avro.md

hadoop.md

hbase.md

hcatalog.md

index.md

jdbc.md

tile.json