Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources
—
Apache Hive HCatalog metadata integration for Flink batch processing, enabling access to Hive tables with schema support, partition filtering, and automatic type mapping.
Abstract base InputFormat for reading from HCatalog tables with comprehensive configuration options.
/**
* Abstract base InputFormat for reading from HCatalog tables
* @param <T> The type of records produced (typically HCatRecord or Flink Tuple)
*/
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit>
implements ResultTypeQueryable<T> {
/**
* Default constructor using default database and table from context
*/
public HCatInputFormatBase();
/**
* Creates HCatInputFormatBase for a specific database and table
* @param database Name of the Hive database
* @param table Name of the Hive table
*/
public HCatInputFormatBase(String database, String table);
/**
* Creates HCatInputFormatBase with custom Hadoop configuration
* @param database Name of the Hive database
* @param table Name of the Hive table
* @param config Hadoop Configuration with HCatalog settings
*/
public HCatInputFormatBase(String database, String table, Configuration config);
/**
* Specifies which fields to return and their order
* @param fields Array of field names to include in the output
* @return This instance for method chaining
*/
public HCatInputFormatBase<T> getFields(String... fields);
/**
* Specifies partition filter condition for partition pruning
* @param filter Partition filter expression (e.g., "year=2023 AND month=12")
* @return This instance for method chaining
*/
public HCatInputFormatBase<T> withFilter(String filter);
/**
* Configures the format to return Flink tuples instead of HCatRecord
* @return This instance for method chaining
*/
public HCatInputFormatBase<T> asFlinkTuples();
/**
* Returns the Hadoop Configuration used by this format
* @return Hadoop Configuration instance
*/
public Configuration getConfiguration();
/**
* Returns the HCatalog schema for the output data
* @return HCatSchema describing the table structure
*/
public HCatSchema getOutputSchema();
/**
* Returns the type information for the records produced by this format
* @return TypeInformation describing the output type
*/
public TypeInformation<T> getProducedType();
/**
* Returns the maximum tuple size supported by this format implementation
* Subclasses define the specific limit (e.g., 25 for standard Java API)
* @return Maximum number of fields supported in Flink tuples
*/
protected abstract int getMaxFlinkTupleSize();
/**
* Builds a Flink tuple from an HCatRecord
* @param t The tuple instance to populate (may be reused)
* @param record The HCatRecord containing the data
* @return Populated Flink tuple
*/
protected abstract T buildFlinkTuple(T t, HCatRecord record);
}Concrete HCatalog InputFormat for Java API with support for up to 25 tuple fields.
/**
* Concrete HCatalog InputFormat for Java API with max 25 tuple fields
* @param <T> The Flink tuple type (Tuple1 through Tuple25)
*/
public class HCatInputFormat<T> extends HCatInputFormatBase<T> {
/**
* Default constructor using default database and table from context
*/
public HCatInputFormat();
/**
* Creates HCatInputFormat for a specific database and table
* @param database Name of the Hive database
* @param table Name of the Hive table
*/
public HCatInputFormat(String database, String table);
/**
* Creates HCatInputFormat with custom Hadoop configuration
* @param database Name of the Hive database
* @param table Name of the Hive table
* @param config Hadoop Configuration with HCatalog settings
*/
public HCatInputFormat(String database, String table, Configuration config);
}Basic Usage Example:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.hcatalog.java.HCatInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read from Hive table
HCatInputFormat<Tuple3<String, Integer, String>> hcatInput =
new HCatInputFormat<>("mydb", "users");
// Configure which fields to read
hcatInput
.getFields("name", "age", "city") // Select specific columns
.withFilter("age > 18 AND city='New York'") // Filter partitions
.asFlinkTuples(); // Return as Flink tuples
DataSet<Tuple3<String, Integer, String>> users = env.createInput(hcatInput);
users.print();Advanced Configuration Example:
import org.apache.hadoop.conf.Configuration;
import org.apache.flink.api.java.tuple.Tuple5;
// Configure Hadoop/Hive settings
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("hive.metastore.uris", "thrift://localhost:9083");
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020");
// Create input format with custom configuration
HCatInputFormat<Tuple5<String, Integer, String, Double, Long>> salesInput =
new HCatInputFormat<>("sales_db", "transactions", hadoopConfig);
// Configure for complex query
salesInput
.getFields("customer_id", "quantity", "product", "amount", "timestamp")
.withFilter("year=2023 AND month>=10 AND region='US'") // Partition pruning
.asFlinkTuples();
DataSet<Tuple5<String, Integer, String, Double, Long>> sales = env.createInput(salesInput);
// Process sales data
DataSet<Tuple2<String, Double>> customerTotals = sales
.groupBy(0) // Group by customer_id
.aggregate(Aggregations.SUM, 3) // Sum amounts
.project(0, 3); // Keep customer_id and total
customerTotals.print();HCatalog automatically maps Hive types to Flink types:
// Hive Schema -> Flink Types
// STRING -> String
// INT -> Integer
// BIGINT -> Long
// DOUBLE -> Double
// BOOLEAN -> Boolean
// ARRAY<T> -> List<T>
// MAP<K,V> -> Map<K,V>
// STRUCT -> Complex types (limited support)import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.flink.api.java.tuple.Tuple4;
// For tables with complex types, you may need custom processing
public class ComplexHCatInputFormat extends HCatInputFormatBase<Tuple4<String, List<String>, Map<String, Integer>, String>> {
public ComplexHCatInputFormat(String database, String table) {
super(database, table);
}
@Override
protected int getMaxFlinkTupleSize() {
return 25; // Standard Java API limit
}
@Override
protected Tuple4<String, List<String>, Map<String, Integer>, String> buildFlinkTuple(
Tuple4<String, List<String>, Map<String, Integer>, String> t,
HCatRecord record) {
// Extract primitive fields
String id = (String) record.get("id");
String status = (String) record.get("status");
// Extract complex fields
@SuppressWarnings("unchecked")
List<String> tags = (List<String>) record.get("tags");
@SuppressWarnings("unchecked")
Map<String, Integer> metrics = (Map<String, Integer>) record.get("metrics");
return new Tuple4<>(id, tags, metrics, status);
}
}Efficient partition pruning reduces data processing overhead:
// Partition filter examples
hcatInput.withFilter("year=2023"); // Single partition
hcatInput.withFilter("year=2023 AND month=12"); // Multiple partitions
hcatInput.withFilter("year>=2022 AND region IN ('US','EU')"); // Range and set filters
hcatInput.withFilter("year=2023 AND month>=6 AND month<=12"); // Range filtersimport org.apache.flink.api.java.tuple.Tuple6;
// Read from partitioned table with partition columns included
HCatInputFormat<Tuple6<String, Integer, String, String, Integer, Integer>> partitionedInput =
new HCatInputFormat<>("warehouse", "sales_partitioned");
partitionedInput
.getFields("customer", "amount", "product", "region", "year", "month") // Include partition columns
.withFilter("year=2023 AND region IN ('US','EU')")
.asFlinkTuples();
DataSet<Tuple6<String, Integer, String, String, Integer, Integer>> partitionedSales =
env.createInput(partitionedInput);
// Partition information is available in the data
partitionedSales
.filter(tuple -> tuple.f4 == 2023 && tuple.f5 >= 10) // Additional filtering by partition columns
.print();// Configure input split size for better parallelism
Configuration config = new Configuration();
config.setLong("mapreduce.input.fileinputformat.split.maxsize", 134217728L); // 128MB splits
config.setInt("hive.exec.reducers.max", 200); // Maximum number of reducers
HCatInputFormat<Tuple3<String, Double, String>> optimizedInput =
new HCatInputFormat<>("bigdata", "large_table", config);// For ORC or Parquet tables, specify columns for projection pushdown
hcatInput
.getFields("id", "name", "amount") // Only read needed columns
.withFilter("year=2023"); // Partition pruning
// This enables:
// - Column projection (reduces I/O)
// - Partition pruning (reduces data scanned)
// - Predicate pushdown (when supported by storage format)import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
// Handle potential null values and type conversion errors
DataSet<Tuple3<String, Integer, String>> safeUsers = users
.map(new MapFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>>() {
@Override
public Tuple3<String, Integer, String> map(Tuple3<String, Integer, String> value) {
// Handle null values
String name = value.f0 != null ? value.f0 : "Unknown";
Integer age = value.f1 != null ? value.f1 : 0;
String city = value.f2 != null ? value.f2 : "Unknown";
return new Tuple3<>(name, age, city);
}
});import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit as HadoopInputSplit;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import java.util.List;
import java.util.Map;