SQL parser component for Apache Flink that provides Hive dialect support for parsing Hive-specific DDL and DML statements
—
Type system extensions provide support for Hive-specific data types including enhanced STRUCT types with field comments and improved type specifications.
Enhanced STRUCT type specification with field names, types, and comments.
/**
* STRUCT type specification with field names, types, and comments
* Extends standard row type specification to support Hive-specific STRUCT syntax
*/
public class ExtendedHiveStructTypeNameSpec extends ExtendedSqlRowTypeNameSpec {
/**
* Creates enhanced STRUCT type specification for Hive
* @param pos Parser position information
* @param fieldNames List of field identifiers
* @param fieldTypes List of field data type specifications
* @param comments List of field comments (can contain null values)
* @throws ParseException if validation fails
*/
public ExtendedHiveStructTypeNameSpec(SqlParserPos pos, List<SqlIdentifier> fieldNames,
List<SqlDataTypeSpec> fieldTypes,
List<SqlCharStringLiteral> comments) throws ParseException;
}Usage Examples:
// STRUCT type in table creation
String createTableWithStructSql = """
CREATE TABLE user_profile (
user_id BIGINT,
name STRING,
address STRUCT<
street: STRING COMMENT 'Street address',
city: STRING COMMENT 'City name',
state: STRING COMMENT 'State or province',
zip_code: STRING COMMENT 'Postal code',
country: STRING COMMENT 'Country code'
> COMMENT 'User address information',
preferences STRUCT<
language: STRING COMMENT 'Preferred language',
timezone: STRING COMMENT 'User timezone',
notifications: BOOLEAN COMMENT 'Enable notifications'
> COMMENT 'User preferences'
)
""";
// Nested STRUCT types
String nestedStructSql = """
CREATE TABLE order_data (
order_id BIGINT,
customer_info STRUCT<
basic: STRUCT<
name: STRING COMMENT 'Customer name',
email: STRING COMMENT 'Email address'
> COMMENT 'Basic customer information',
address: STRUCT<
billing: STRUCT<
street: STRING,
city: STRING,
country: STRING
> COMMENT 'Billing address',
shipping: STRUCT<
street: STRING,
city: STRING,
country: STRING
> COMMENT 'Shipping address'
> COMMENT 'Customer addresses'
> COMMENT 'Complete customer information'
)
""";
// Programmatic STRUCT type creation
SqlParserPos pos = SqlParserPos.ZERO;
// Field names
List<SqlIdentifier> fieldNames = List.of(
new SqlIdentifier("name", pos),
new SqlIdentifier("age", pos),
new SqlIdentifier("email", pos)
);
// Field types
List<SqlDataTypeSpec> fieldTypes = List.of(
new SqlDataTypeSpec(new SqlBasicTypeNameSpec(SqlTypeName.VARCHAR, 255, pos), pos),
new SqlDataTypeSpec(new SqlBasicTypeNameSpec(SqlTypeName.INTEGER, pos), pos),
new SqlDataTypeSpec(new SqlBasicTypeNameSpec(SqlTypeName.VARCHAR, 100, pos), pos)
);
// Field comments
List<SqlCharStringLiteral> comments = List.of(
SqlLiteral.createCharString("Full name", pos),
SqlLiteral.createCharString("Age in years", pos),
SqlLiteral.createCharString("Email address", pos)
);
// Create STRUCT type specification
ExtendedHiveStructTypeNameSpec structType = new ExtendedHiveStructTypeNameSpec(
pos, fieldNames, fieldTypes, comments
);Combine STRUCT with other complex types (ARRAY, MAP):
// STRUCT containing arrays and maps
String complexTypeTableSql = """
CREATE TABLE analytics_data (
event_id BIGINT,
user_data STRUCT<
user_id: BIGINT COMMENT 'Unique user identifier',
tags: ARRAY<STRING> COMMENT 'User tags',
attributes: MAP<STRING, STRING> COMMENT 'Dynamic attributes',
preferences: STRUCT<
categories: ARRAY<STRING> COMMENT 'Preferred categories',
settings: MAP<STRING, BOOLEAN> COMMENT 'Feature settings'
> COMMENT 'User preferences'
> COMMENT 'Complete user data structure',
session_data STRUCT<
session_id: STRING COMMENT 'Session identifier',
events: ARRAY<STRUCT<
timestamp: TIMESTAMP COMMENT 'Event timestamp',
event_type: STRING COMMENT 'Type of event',
properties: MAP<STRING, STRING> COMMENT 'Event properties'
>> COMMENT 'Session events'
> COMMENT 'Session information'
)
""";Handle type conversion between Hive and Flink types:
// Type conversion examples
String typeConversionSql = """
CREATE TABLE type_conversion_example (
-- Hive TIMESTAMP -> Flink TIMESTAMP
event_time TIMESTAMP COMMENT 'Event timestamp',
-- Hive BINARY -> Flink BYTES
data_payload BINARY COMMENT 'Binary data payload',
-- Complex type with conversions
metadata STRUCT<
created_at: TIMESTAMP COMMENT 'Creation timestamp',
updated_at: TIMESTAMP COMMENT 'Last update timestamp',
checksum: BINARY COMMENT 'Data checksum',
tags: ARRAY<STRING> COMMENT 'Metadata tags'
> COMMENT 'Record metadata'
)
""";Apply validation to complex types:
// Type validation in table creation
String validatedTypesTableSql = """
CREATE TABLE validated_data (
record_id BIGINT NOT NULL,
contact_info STRUCT<
email: STRING COMMENT 'Email address (validated format)',
phone: STRING COMMENT 'Phone number (validated format)',
address: STRUCT<
street: STRING COMMENT 'Street address',
city: STRING COMMENT 'City name',
postal_code: STRING COMMENT 'Postal code (validated format)'
> COMMENT 'Postal address'
> COMMENT 'Contact information with validation',
-- Constraints can be applied to complex type fields through CHECK constraints
CONSTRAINT valid_email CHECK (contact_info.email LIKE '%@%.%'),
CONSTRAINT valid_postal CHECK (LENGTH(contact_info.address.postal_code) >= 5)
)
""";Utility methods for building complex type specifications:
public class HiveTypeSpecBuilder {
/**
* Creates a STRUCT type specification with field comments
*/
public static ExtendedHiveStructTypeNameSpec createStructType(
SqlParserPos pos,
Map<String, String> fieldTypes,
Map<String, String> fieldComments) throws ParseException {
List<SqlIdentifier> fieldNames = new ArrayList<>();
List<SqlDataTypeSpec> typeSpecs = new ArrayList<>();
List<SqlCharStringLiteral> comments = new ArrayList<>();
for (Map.Entry<String, String> entry : fieldTypes.entrySet()) {
fieldNames.add(new SqlIdentifier(entry.getKey(), pos));
// Parse type specification (simplified - real implementation would be more complex)
SqlTypeNameSpec typeNameSpec = parseTypeString(entry.getValue(), pos);
typeSpecs.add(new SqlDataTypeSpec(typeNameSpec, pos));
// Add comment if available
String comment = fieldComments.get(entry.getKey());
if (comment != null) {
comments.add(SqlLiteral.createCharString(comment, pos));
} else {
comments.add(null);
}
}
return new ExtendedHiveStructTypeNameSpec(pos, fieldNames, typeSpecs, comments);
}
/**
* Creates an ARRAY type containing STRUCT elements
*/
public static SqlDataTypeSpec createArrayOfStructType(SqlParserPos pos,
ExtendedHiveStructTypeNameSpec structSpec) {
ExtendedSqlCollectionTypeNameSpec arraySpec =
new ExtendedSqlCollectionTypeNameSpec(SqlTypeName.ARRAY, pos, structSpec);
return new SqlDataTypeSpec(arraySpec, pos);
}
/**
* Creates a MAP type with STRUCT values
*/
public static SqlDataTypeSpec createMapWithStructValuesType(SqlParserPos pos,
SqlTypeNameSpec keyType,
ExtendedHiveStructTypeNameSpec valueStructType) {
SqlMapTypeNameSpec mapSpec = new SqlMapTypeNameSpec(pos, keyType, valueStructType);
return new SqlDataTypeSpec(mapSpec, pos);
}
private static SqlTypeNameSpec parseTypeString(String typeString, SqlParserPos pos) {
// Simplified type parsing - real implementation would handle all Hive types
switch (typeString.toUpperCase()) {
case "STRING":
return new SqlBasicTypeNameSpec(SqlTypeName.VARCHAR, pos);
case "BIGINT":
return new SqlBasicTypeNameSpec(SqlTypeName.BIGINT, pos);
case "INT":
return new SqlBasicTypeNameSpec(SqlTypeName.INTEGER, pos);
case "BOOLEAN":
return new SqlBasicTypeNameSpec(SqlTypeName.BOOLEAN, pos);
case "DOUBLE":
return new SqlBasicTypeNameSpec(SqlTypeName.DOUBLE, pos);
case "TIMESTAMP":
return new SqlBasicTypeNameSpec(SqlTypeName.TIMESTAMP, pos);
default:
throw new IllegalArgumentException("Unsupported type: " + typeString);
}
}
}
// Usage examples
SqlParserPos pos = SqlParserPos.ZERO;
// Create user profile struct
Map<String, String> userFields = Map.of(
"user_id", "BIGINT",
"username", "STRING",
"email", "STRING",
"active", "BOOLEAN"
);
Map<String, String> userComments = Map.of(
"user_id", "Unique user identifier",
"username", "Login username",
"email", "User email address",
"active", "Account active status"
);
ExtendedHiveStructTypeNameSpec userProfileType = HiveTypeSpecBuilder.createStructType(
pos, userFields, userComments
);
// Create array of user profiles
SqlDataTypeSpec userArrayType = HiveTypeSpecBuilder.createArrayOfStructType(pos, userProfileType);
// Create map with user profile values
SqlTypeNameSpec stringKeyType = new SqlBasicTypeNameSpec(SqlTypeName.VARCHAR, pos);
SqlDataTypeSpec userMapType = HiveTypeSpecBuilder.createMapWithStructValuesType(
pos, stringKeyType, userProfileType
);Extract type information from complex type specifications:
public class HiveTypeInspector {
/**
* Extracts field information from STRUCT type
*/
public static List<FieldInfo> extractStructFields(ExtendedHiveStructTypeNameSpec structType) {
List<FieldInfo> fields = new ArrayList<>();
// Access field information (implementation would depend on internal structure)
// This is a conceptual example
return fields;
}
/**
* Validates STRUCT type compatibility
*/
public static boolean areStructTypesCompatible(ExtendedHiveStructTypeNameSpec type1,
ExtendedHiveStructTypeNameSpec type2) {
// Implementation would compare field names, types, and nullability
// Return true if types are assignment-compatible
return true; // Placeholder
}
/**
* Generates DDL representation of complex type
*/
public static String generateTypeDDL(SqlDataTypeSpec typeSpec) {
StringBuilder ddl = new StringBuilder();
// Generate DDL string representation
// Implementation would handle all complex type combinations
return ddl.toString();
}
public static class FieldInfo {
private final String name;
private final String type;
private final String comment;
private final boolean nullable;
public FieldInfo(String name, String type, String comment, boolean nullable) {
this.name = name;
this.type = type;
this.comment = comment;
this.nullable = nullable;
}
// Getters
public String getName() { return name; }
public String getType() { return type; }
public String getComment() { return comment; }
public boolean isNullable() { return nullable; }
}
}Store complex type information in Hive metastore:
// Type metadata is automatically stored in Hive metastore
String createTableWithMetadataSql = """
CREATE TABLE complex_data (
record_id BIGINT COMMENT 'Primary key',
data_structure STRUCT<
header: STRUCT<
version: STRING COMMENT 'Data format version',
timestamp: TIMESTAMP COMMENT 'Data creation time'
> COMMENT 'Data header information',
payload: STRUCT<
content: STRING COMMENT 'Main data content',
metadata: MAP<STRING, STRING> COMMENT 'Additional metadata'
> COMMENT 'Data payload'
> COMMENT 'Complete data structure'
)
TBLPROPERTIES (
'type_system_version' = '2.0',
'schema_evolution_enabled' = 'true'
)
""";Handle schema evolution for complex types:
// Schema evolution example
String evolveStructTypeSql = """
-- Original table
CREATE TABLE user_events_v1 (
event_id BIGINT,
user_data STRUCT<
user_id: BIGINT,
name: STRING
>
);
-- Evolved table with additional fields
CREATE TABLE user_events_v2 (
event_id BIGINT,
user_data STRUCT<
user_id: BIGINT,
name: STRING,
email: STRING COMMENT 'Added in v2',
preferences: STRUCT<
language: STRING,
timezone: STRING
> COMMENT 'Added in v2'
>
);
""";Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-parser-hive