Advanced partition handling utilities for efficient data access, partition pruning, and metadata management in partitioned Hive tables. These utilities provide comprehensive support for partition discovery, serialization, and manipulation in both batch and streaming scenarios.
Core data structure representing a Hive table partition or whole table for non-partitioned tables.
/**
* Represents a Hive table partition or whole table
* Encapsulates storage descriptor and partition specification
* Used for partition-aware data processing and metadata operations
*/
@PublicEvolving
public class HiveTablePartition implements Serializable {
/**
* Create partition representation for non-partitioned table
* @param storageDescriptor Hive storage descriptor with location and format info
* @param tableParameters Table-level parameters and properties
* @return HiveTablePartition representing the whole table
*/
public static HiveTablePartition ofTable(StorageDescriptor storageDescriptor,
Map<String, String> tableParameters);
/**
* Create partition representation for partitioned table
* @param storageDescriptor Partition storage descriptor with location and format info
* @param partitionSpec Partition key-value specification (e.g., {year=2024, month=01})
* @param tableParameters Table-level parameters and properties
* @return HiveTablePartition representing the specific partition
*/
public static HiveTablePartition ofPartition(StorageDescriptor storageDescriptor,
Map<String, String> partitionSpec,
Map<String, String> tableParameters);
/**
* Get storage descriptor containing location, input/output formats, and SerDe info
* @return StorageDescriptor with partition storage details
*/
public StorageDescriptor getStorageDescriptor();
/**
* Get partition specification as key-value pairs
* @return Map of partition keys to values, empty for non-partitioned tables
*/
public Map<String, String> getPartitionSpec();
/**
* Get table-level parameters and properties
* @return Map of table parameters
*/
public Map<String, String> getTableParameters();
/**
* Check if this represents a partitioned table entry
* @return true if partition has non-empty partition specification
*/
public boolean isPartitioned();
/**
* Get partition location path
* @return String path to partition data location
*/
public String getLocation();
/**
* Get input format class name
* @return Input format class for reading partition data
*/
public String getInputFormat();
/**
* Get output format class name
* @return Output format class for writing partition data
*/
public String getOutputFormat();
/**
* Get SerDe (Serializer/Deserializer) class name
* @return SerDe class for data serialization
*/
public String getSerDe();
}Usage Examples:
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
// Create partition for partitioned table
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setLocation("hdfs://namenode:9000/warehouse/events/year=2024/month=01");
partitionSd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
partitionSd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat");
Map<String, String> partitionSpec = Map.of(
"year", "2024",
"month", "01"
);
Map<String, String> tableParams = Map.of(
"table.type", "EXTERNAL_TABLE",
"transient_lastDdlTime", "1640995200"
);
HiveTablePartition partition = HiveTablePartition.ofPartition(
partitionSd,
partitionSpec,
tableParams
);
System.out.println("Partition location: " + partition.getLocation());
System.out.println("Partition spec: " + partition.getPartitionSpec());
System.out.println("Is partitioned: " + partition.isPartitioned());
// Create representation for non-partitioned table
StorageDescriptor tableSd = new StorageDescriptor();
tableSd.setLocation("hdfs://namenode:9000/warehouse/users");
tableSd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat");
HiveTablePartition wholeTable = HiveTablePartition.ofTable(tableSd, tableParams);
System.out.println("Table location: " + wholeTable.getLocation());
System.out.println("Is partitioned: " + wholeTable.isPartitioned()); // falseUtility class providing comprehensive partition management operations including discovery, serialization, and metadata handling.
/**
* Utility class for Hive partition operations
* Provides methods for partition discovery, serialization, and metadata management
*/
public class HivePartitionUtils {
/**
* Get all partitions for a table, including metadata and storage descriptors
* @param jobConf Hadoop JobConf with Hive and HDFS configuration
* @param hiveVersion Hive version for compatibility
* @param tablePath Table path (database.table)
* @param partitionColNames List of partition column names for validation
* @param remainingPartitions List of remaining partition specifications to filter
* @return List of all table partitions with complete metadata
*/
public static List<HiveTablePartition> getAllPartitions(JobConf jobConf,
String hiveVersion,
ObjectPath tablePath,
List<String> partitionColNames,
List<Map<String, String>> remainingPartitions);
/**
* Serialize list of HiveTablePartition objects to list of byte arrays
* Used for efficient partition metadata transfer and caching
* @param partitions List of partitions to serialize
* @return List of serialized byte arrays, one per partition
*/
public static List<byte[]> serializeHiveTablePartition(List<HiveTablePartition> partitions);
/**
* Deserialize list of byte arrays back to list of HiveTablePartition objects
* @param partitionBytes List of serialized partition data
* @return List of deserialized partitions
*/
public static List<HiveTablePartition> deserializeHiveTablePartition(List<byte[]> partitionBytes);
}Usage Examples:
import org.apache.flink.connectors.hive.util.HivePartitionUtils;
import org.apache.hadoop.mapred.JobConf;
// Configure Hadoop JobConf
JobConf jobConf = new JobConf();
jobConf.set("hive.metastore.uris", "thrift://metastore:9083");
jobConf.set("fs.defaultFS", "hdfs://namenode:9000");
// Get all partitions for a table
ObjectPath tablePath = new ObjectPath("sales", "transactions");
List<String> partitionColumns = Arrays.asList("year", "month", "day");
List<HiveTablePartition> allPartitions = HivePartitionUtils.getAllPartitions(
jobConf,
"2.3.9", // hiveVersion
tablePath,
partitionColumns,
Collections.emptyList() // remainingPartitions
);
System.out.println("Found " + allPartitions.size() + " partitions");
// Filter partitions for specific date range
List<HiveTablePartition> filteredPartitions = allPartitions.stream()
.filter(p -> {
Map<String, String> spec = p.getPartitionSpec();
return "2024".equals(spec.get("year")) &&
Integer.parseInt(spec.get("month")) >= 1 &&
Integer.parseInt(spec.get("month")) <= 3;
})
.collect(Collectors.toList());
System.out.println("Q1 2024 partitions: " + filteredPartitions.size());
// Serialize partitions for caching
List<byte[]> serializedPartitions = HivePartitionUtils.serializeHiveTablePartition(filteredPartitions);
System.out.println("Serialized " + serializedPartitions.size() + " partitions");
// Deserialize partitions
List<HiveTablePartition> deserializedPartitions = HivePartitionUtils.deserializeHiveTablePartition(
serializedPartitions
);
System.out.println("Deserialized " + deserializedPartitions.size() + " partitions");