SQL parser component for Apache Flink that provides Hive dialect support for parsing Hive-specific DDL and DML statements
—
Partition management provides operations for adding, renaming, and managing Hive table partitions.
Add new partitions to an existing Hive table with optional location specifications.
/**
* ADD PARTITION statement for Hive tables
* Adds one or more partitions to an existing partitioned table
*/
public class SqlAddHivePartitions extends SqlCall {
/**
* Creates add partitions statement
* @param pos Parser position information
* @param tableName Name of the table to add partitions to
* @param ifNotExists Whether to use IF NOT EXISTS clause
* @param partSpecs List of partition specifications
* @param partLocations List of partition locations (can be null)
*/
public SqlAddHivePartitions(SqlParserPos pos, SqlIdentifier tableName, boolean ifNotExists,
List<SqlNodeList> partSpecs, List<SqlCharStringLiteral> partLocations);
}Usage Examples:
// Add single partition
String addPartitionSql = """
ALTER TABLE sales_data
ADD PARTITION (year=2023, month=12)
""";
// Add partition with location
String addPartitionWithLocationSql = """
ALTER TABLE sales_data
ADD PARTITION (year=2023, month=12)
LOCATION '/data/sales/2023/12'
""";
// Add multiple partitions
String addMultiplePartitionsSql = """
ALTER TABLE sales_data
ADD IF NOT EXISTS
PARTITION (year=2023, month=11) LOCATION '/data/sales/2023/11'
PARTITION (year=2023, month=12) LOCATION '/data/sales/2023/12'
PARTITION (year=2024, month=1) LOCATION '/data/sales/2024/01'
""";
// Programmatic partition addition
SqlIdentifier tableName = new SqlIdentifier("analytics_data", SqlParserPos.ZERO);
// First partition specification: (date_partition='2023-12-01')
SqlNodeList partSpec1 = new SqlNodeList(SqlParserPos.ZERO);
partSpec1.add(new SqlTableOption("date_partition", "2023-12-01", SqlParserPos.ZERO));
// Second partition specification: (date_partition='2023-12-02')
SqlNodeList partSpec2 = new SqlNodeList(SqlParserPos.ZERO);
partSpec2.add(new SqlTableOption("date_partition", "2023-12-02", SqlParserPos.ZERO));
List<SqlNodeList> partSpecs = List.of(partSpec1, partSpec2);
List<SqlCharStringLiteral> locations = List.of(
SqlLiteral.createCharString("/data/analytics/2023-12-01", SqlParserPos.ZERO),
SqlLiteral.createCharString("/data/analytics/2023-12-02", SqlParserPos.ZERO)
);
SqlAddHivePartitions addPartitions = new SqlAddHivePartitions(
SqlParserPos.ZERO,
tableName,
true, // IF NOT EXISTS
partSpecs,
locations
);Rename existing partitions by changing their partition values.
/**
* PARTITION RENAME statement for Hive tables
* Renames an existing partition by changing its partition specification
*/
public class SqlAlterHivePartitionRename extends SqlAlterHiveTable {
/**
* Creates partition rename statement
* @param pos Parser position information
* @param tableName Name of table containing the partition
* @param partSpec Current partition specification
* @param newPartSpec New partition specification
* @throws ParseException if validation fails
*/
public SqlAlterHivePartitionRename(SqlParserPos pos, SqlIdentifier tableName,
SqlNodeList partSpec, SqlNodeList newPartSpec) throws ParseException;
/**
* Gets the new partition specification
* @return SqlNodeList containing new partition values
*/
public SqlNodeList getNewPartSpec();
}Usage Examples:
// Rename partition
String renamePartitionSql = """
ALTER TABLE sales_data
PARTITION (year=2023, month=13)
RENAME TO PARTITION (year=2024, month=1)
""";
// Programmatic partition rename
SqlIdentifier tableName = new SqlIdentifier("sales_data", SqlParserPos.ZERO);
// Current partition specification
SqlNodeList currentPartSpec = new SqlNodeList(SqlParserPos.ZERO);
currentPartSpec.add(new SqlTableOption("year", "2023", SqlParserPos.ZERO));
currentPartSpec.add(new SqlTableOption("month", "13", SqlParserPos.ZERO));
// New partition specification
SqlNodeList newPartSpec = new SqlNodeList(SqlParserPos.ZERO);
newPartSpec.add(new SqlTableOption("year", "2024", SqlParserPos.ZERO));
newPartSpec.add(new SqlTableOption("month", "1", SqlParserPos.ZERO));
SqlAlterHivePartitionRename renamePartition = new SqlAlterHivePartitionRename(
SqlParserPos.ZERO,
tableName,
currentPartSpec,
newPartSpec
);
// Access new partition specification
SqlNodeList newSpec = renamePartition.getNewPartSpec();Many table alteration operations can be applied to specific partitions:
// Change partition location
String changePartLocationSql = """
ALTER TABLE sales_data
PARTITION (year=2023, month=12)
SET LOCATION '/new/data/sales/2023/12'
""";
// Change partition file format
String changePartFormatSql = """
ALTER TABLE sales_data
PARTITION (year=2023, month=12)
SET FILEFORMAT PARQUET
""";
// Change partition SerDe
String changePartSerdeSql = """
ALTER TABLE sales_data
PARTITION (year=2023, month=12)
SET SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
""";Partitions are often created dynamically during INSERT operations:
// Dynamic partition insert
String dynamicInsertSql = """
INSERT INTO TABLE sales_data
PARTITION (year, month)
SELECT id, customer_id, product_name, amount, transaction_date,
YEAR(transaction_date) as year,
MONTH(transaction_date) as month
FROM raw_sales_data
""";Partition specifications are validated during parsing:
try {
// This will validate partition column types and values
SqlAddHivePartitions addPartitions = new SqlAddHivePartitions(
pos, tableName, false, partSpecs, locations
);
} catch (ParseException e) {
// Thrown for invalid partition specifications
System.err.println("Invalid partition specification: " + e.getMessage());
}Partition specifications follow specific format requirements:
// Valid partition specifications
String validPartitions = """
PARTITION (year=2023, month=12, day=15) -- Multiple columns
PARTITION (date_str='2023-12-15') -- String partition
PARTITION (year=2023) -- Single column
""";
// Partition specification building
SqlNodeList buildPartitionSpec(Map<String, String> partitionValues) {
SqlNodeList partSpec = new SqlNodeList(SqlParserPos.ZERO);
for (Map.Entry<String, String> entry : partitionValues.entrySet()) {
partSpec.add(new SqlTableOption(
entry.getKey(),
entry.getValue(),
SqlParserPos.ZERO
));
}
return partSpec;
}
// Usage
Map<String, String> partValues = Map.of(
"year", "2023",
"month", "12",
"region", "us-west"
);
SqlNodeList partSpec = buildPartitionSpec(partValues);public class PartitionManager {
private SqlParser parser;
private TableEnvironment tableEnv;
public PartitionManager(TableEnvironment tableEnv) {
this.tableEnv = tableEnv;
this.parser = SqlParser.create("",
SqlParser.config().withParserFactory(FlinkHiveSqlParserImpl.FACTORY));
}
public void addMonthlyPartitions(String tableName, int year, int[] months) {
for (int month : months) {
String addPartitionSql = String.format("""
ALTER TABLE %s
ADD IF NOT EXISTS PARTITION (year=%d, month=%d)
LOCATION '/data/%s/%d/%02d'
""", tableName, year, month, tableName, year, month);
try {
tableEnv.executeSql(addPartitionSql);
System.out.println("Added partition: year=" + year + ", month=" + month);
} catch (Exception e) {
System.err.println("Failed to add partition: " + e.getMessage());
}
}
}
public void reorganizePartition(String tableName, String oldYearMonth, String newYearMonth) {
// Step 1: Add new partition
String[] oldParts = oldYearMonth.split("-");
String[] newParts = newYearMonth.split("-");
String addNewPartitionSql = String.format("""
ALTER TABLE %s
ADD IF NOT EXISTS PARTITION (year=%s, month=%s)
""", tableName, newParts[0], newParts[1]);
// Step 2: Move data (would be done outside parser, e.g., with INSERT INTO ... SELECT)
// Step 3: Drop old partition (would use DROP PARTITION statement)
try {
tableEnv.executeSql(addNewPartitionSql);
System.out.println("Reorganized partition from " + oldYearMonth + " to " + newYearMonth);
} catch (Exception e) {
System.err.println("Failed to reorganize partition: " + e.getMessage());
}
}
public void updatePartitionLocation(String tableName, Map<String, String> partitionSpec, String newLocation) {
StringBuilder partSpecSql = new StringBuilder("PARTITION (");
partitionSpec.entrySet().stream()
.map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(joining(", "));
partSpecSql.append(")");
String updateLocationSql = String.format("""
ALTER TABLE %s
%s
SET LOCATION '%s'
""", tableName, partSpecSql.toString(), newLocation);
try {
tableEnv.executeSql(updateLocationSql);
System.out.println("Updated partition location: " + newLocation);
} catch (Exception e) {
System.err.println("Failed to update partition location: " + e.getMessage());
}
}
}
// Usage
PartitionManager partManager = new PartitionManager(tableEnv);
// Add quarterly partitions
partManager.addMonthlyPartitions("sales_data", 2024, new int[]{1, 2, 3});
// Reorganize misnamed partition
partManager.reorganizePartition("sales_data", "2023-13", "2024-1");
// Update partition location
Map<String, String> partSpec = Map.of("year", "2023", "month", "12");
partManager.updatePartitionLocation("sales_data", partSpec, "/data/archived/sales/2023/12");Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-parser-hive