Utilities for converting between Hive and Catalyst data types, handling ObjectInspectors, and managing SerDe operations. This enables seamless data exchange between Hive and Spark SQL systems.
Core trait providing conversion utilities between Hive ObjectInspectors and Catalyst data types.
/**
* Trait for converting between Hive and Catalyst data formats
* Handles ObjectInspector transformations and type mappings
*/
trait HiveInspectors {
/**
* Convert Java reflection Type to Catalyst DataType
* @param clz - Java Type to convert
* @return Corresponding Catalyst DataType
*/
def javaTypeToDataType(clz: Type): DataType
/**
* Create wrapper function for converting Catalyst data to Hive format
* @param oi - Hive ObjectInspector defining target format
* @param dataType - Source Catalyst DataType
* @return Function to convert Any value from Catalyst to Hive format
*/
def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
/**
* Create unwrapper function for converting Hive data to Catalyst format
* @param objectInspector - Hive ObjectInspector defining source format
* @return Function to convert Any value from Hive to Catalyst format
*/
def unwrapperFor(objectInspector: ObjectInspector): Any => Any
/**
* Convert Catalyst DataType to Hive ObjectInspector
* @param dataType - Catalyst DataType to convert
* @return Corresponding Hive ObjectInspector
*/
def toInspector(dataType: DataType): ObjectInspector
/**
* Convert Hive ObjectInspector to Catalyst DataType
* @param inspector - Hive ObjectInspector to convert
* @return Corresponding Catalyst DataType
*/
def inspectorToDataType(inspector: ObjectInspector): DataType
}Conversions between basic Hive and Catalyst data types.
Primitive Type Mappings:
// String types
StringType <-> PrimitiveObjectInspectorFactory.javaStringObjectInspector
// Numeric types
IntegerType <-> PrimitiveObjectInspectorFactory.javaIntObjectInspector
LongType <-> PrimitiveObjectInspectorFactory.javaLongObjectInspector
DoubleType <-> PrimitiveObjectInspectorFactory.javaDoubleObjectInspector
FloatType <-> PrimitiveObjectInspectorFactory.javaFloatObjectInspector
BooleanType <-> PrimitiveObjectInspectorFactory.javaBooleanObjectInspector
// Binary types
BinaryType <-> PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector
// Date/Time types
DateType <-> PrimitiveObjectInspectorFactory.javaDateObjectInspector
TimestampType <-> PrimitiveObjectInspectorFactory.javaTimestampObjectInspector
// Decimal types
DecimalType <-> PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspectorUsage Examples:
import org.apache.spark.sql.hive.HiveInspectors
import org.apache.spark.sql.types._
// Example implementation using HiveInspectors
class MyHiveProcessor extends HiveInspectors {
// Convert Catalyst schema to Hive ObjectInspector
def createHiveInspector(schema: StructType): ObjectInspector = {
toInspector(schema)
}
// Convert Hive result to Catalyst format
def convertFromHive(value: Any, inspector: ObjectInspector): Any = {
val unwrapper = unwrapperFor(inspector)
unwrapper(value)
}
// Convert Catalyst data to Hive format
def convertToHive(value: Any, dataType: DataType, inspector: ObjectInspector): Any = {
val wrapper = wrapperFor(inspector, dataType)
wrapper(value)
}
}Handling complex data types like arrays, maps, and structs.
Array Type Conversion:
import org.apache.spark.sql.types._
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
// Array of strings: ArrayType(StringType) <-> ListObjectInspector
val arrayType = ArrayType(StringType, containsNull = true)
val arrayInspector = toInspector(arrayType)
// Usage example
val catalystArray = Array("hello", "world", null)
val hiveArray = wrapperFor(arrayInspector, arrayType)(catalystArray)
// Convert back
val convertedBack = unwrapperFor(arrayInspector)(hiveArray)Map Type Conversion:
// Map type: MapType(StringType, IntegerType) <-> MapObjectInspector
val mapType = MapType(StringType, IntegerType, valueContainsNull = true)
val mapInspector = toInspector(mapType)
// Usage example
val catalystMap = Map("key1" -> 1, "key2" -> 2, "key3" -> null)
val hiveMap = wrapperFor(mapInspector, mapType)(catalystMap)Struct Type Conversion:
// Struct type: StructType <-> StructObjectInspector
val structType = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = true),
StructField("active", BooleanType, nullable = false)
))
val structInspector = toInspector(structType)
// Usage example
import org.apache.spark.sql.catalyst.InternalRow
val catalystRow = InternalRow.fromSeq(Seq("Alice", 25, true))
val hiveStruct = wrapperFor(structInspector, structType)(catalystRow)Integration with Hive Serializer/Deserializer (SerDe) classes.
Working with LazySimpleSerDe:
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import java.util.Properties
// Configure SerDe properties
val serdeProps = new Properties()
serdeProps.setProperty("field.delim", "\t")
serdeProps.setProperty("line.delim", "\n")
serdeProps.setProperty("serialization.format", "\t")
// Create SerDe instance
val serde = new LazySimpleSerDe()
serde.initialize(null, serdeProps)
// Get ObjectInspector from SerDe
val serdeInspector = serde.getObjectInspector()
// Convert to Catalyst type
val catalystType = inspectorToDataType(serdeInspector)Working with JSON SerDe:
import org.apache.hive.hcatalog.data.JsonSerDe
val jsonSerdeProps = new Properties()
jsonSerdeProps.setProperty("serialization.format", "1")
val jsonSerde = new JsonSerDe()
jsonSerde.initialize(null, jsonSerdeProps)
// Handle JSON data conversion
val jsonInspector = jsonSerde.getObjectInspector()
val jsonDataType = inspectorToDataType(jsonInspector)Utilities for schema inference and evolution.
object HiveUtils {
/**
* Infer schema for Hive table from metastore information
* @param table - CatalogTable to infer schema for
* @return Table with inferred schema
*/
def inferSchema(table: CatalogTable): CatalogTable
}Schema Inference Examples:
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.hive.HiveUtils
// Get table from metastore
val client: HiveClient = // obtain HiveClient instance
val table = client.getTable("my_db", "my_table")
// Infer schema from Hive metadata
val tableWithSchema = HiveUtils.inferSchema(table)
println(s"Inferred schema: ${tableWithSchema.schema}")
// Handle schema evolution
val originalSchema = tableWithSchema.schema
val newField = StructField("new_column", StringType, nullable = true)
val evolvedSchema = StructType(originalSchema.fields :+ newField)Handling special Hive types and edge cases.
Decimal Type Handling:
import org.apache.spark.sql.types.DecimalType
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
// Precise decimal conversion
val preciseDecimal = DecimalType(18, 2) // precision=18, scale=2
val decimalInspector = toInspector(preciseDecimal)
// Convert Catalyst Decimal to Hive
val catalystDecimal = Decimal("123456789.99")
val hiveDecimal = wrapperFor(decimalInspector, preciseDecimal)(catalystDecimal)
// Convert back
val convertedDecimal = unwrapperFor(decimalInspector)(hiveDecimal)Date/Time Type Handling:
import java.sql.{Date, Timestamp}
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
// Date conversion
val dateType = DateType
val dateInspector = toInspector(dateType)
val catalystDate = Date.valueOf("2023-12-25")
val hiveDateWritable = wrapperFor(dateInspector, dateType)(catalystDate)
// Timestamp conversion
val timestampType = TimestampType
val timestampInspector = toInspector(timestampType)
val catalystTimestamp = Timestamp.valueOf("2023-12-25 10:30:00")
val hiveTimestampWritable = wrapperFor(timestampInspector, timestampType)(catalystTimestamp)Binary Data Handling:
// Binary type conversion
val binaryType = BinaryType
val binaryInspector = toInspector(binaryType)
val catalystBinary = "Hello World".getBytes("UTF-8")
val hiveBinary = wrapperFor(binaryInspector, binaryType)(catalystBinary)Optimizations for type conversion performance.
Reusing Inspectors:
class OptimizedConverter extends HiveInspectors {
// Cache frequently used inspectors
private val commonInspectors = Map(
StringType -> toInspector(StringType),
IntegerType -> toInspector(IntegerType),
LongType -> toInspector(LongType),
DoubleType -> toInspector(DoubleType)
)
def getInspector(dataType: DataType): ObjectInspector = {
commonInspectors.getOrElse(dataType, toInspector(dataType))
}
// Cache wrapper functions for hot paths
private val wrapperCache = collection.mutable.Map[(ObjectInspector, DataType), Any => Any]()
def getCachedWrapper(oi: ObjectInspector, dataType: DataType): Any => Any = {
wrapperCache.getOrElseUpdate((oi, dataType), wrapperFor(oi, dataType))
}
}Batch Conversion:
def convertBatch(
data: Array[Any],
sourceInspector: ObjectInspector,
targetType: DataType
): Array[Any] = {
val converter = unwrapperFor(sourceInspector)
data.map(converter)
}
// Example usage
val hiveResults: Array[Any] = // ... from Hive query
val catalystResults = convertBatch(hiveResults, resultInspector, targetSchema)Common patterns for handling type conversion errors.
import org.apache.spark.sql.catalyst.util.DateTimeUtils
def safeTypeConversion[T](
value: Any,
converter: Any => Any,
fallback: T
): T = {
try {
converter(value).asInstanceOf[T]
} catch {
case _: ClassCastException =>
println(s"Type conversion failed for value: $value")
fallback
case _: NumberFormatException =>
println(s"Number format error for value: $value")
fallback
case e: Exception =>
println(s"Unexpected conversion error: ${e.getMessage}")
fallback
}
}
// Usage
val result = safeTypeConversion(
hiveValue,
unwrapperFor(inspector),
null
)Type Compatibility Checking:
def isTypeCompatible(hiveInspector: ObjectInspector, catalystType: DataType): Boolean = {
try {
val convertedType = inspectorToDataType(hiveInspector)
// Check if types are compatible (allowing for widening conversions)
(convertedType, catalystType) match {
case (IntegerType, LongType) => true // Int can be widened to Long
case (FloatType, DoubleType) => true // Float can be widened to Double
case (a, b) => a == b
}
} catch {
case _: Exception => false
}
}Complete examples showing type conversion in context.
UDF Type Integration:
import org.apache.spark.sql.hive.HiveInspectors
class MyCustomUDF extends HiveInspectors {
def evaluate(input: Any, inputInspector: ObjectInspector): Any = {
// Convert from Hive to Catalyst
val catalystValue = unwrapperFor(inputInspector)(input)
// Process in Catalyst format
val result = processValue(catalystValue)
// Convert back to Hive format
val outputInspector = toInspector(StringType)
wrapperFor(outputInspector, StringType)(result)
}
private def processValue(value: Any): String = {
// Custom processing logic
value.toString.toUpperCase
}
}Table Scan Integration:
def convertHiveTableScan(
hiveResults: Iterator[Any],
tableSchema: StructType,
hiveInspector: StructObjectInspector
): Iterator[InternalRow] = {
val unwrapper = unwrapperFor(hiveInspector)
hiveResults.map { hiveRow =>
val catalystRow = unwrapper(hiveRow)
catalystRow.asInstanceOf[InternalRow]
}
}