Bidirectional conversion system between Spark and Hive data representations, handling complex nested types and Hive SerDe integration. The HiveInspectors trait provides the core functionality for converting data between Spark's internal representation and Hive's object inspection system.
Core trait providing data conversion between Spark and Hive representations.
/**
* Converts between Spark and Hive data representations
* Handles complex nested types and SerDe integration
*/
trait HiveInspectors {
// Core conversion methods implemented by the trait
}Convert between Spark DataTypes and Hive ObjectInspectors/TypeInfo.
/**
* Convert Java type to Spark DataType
* @param clz Java Type to convert
* @return Corresponding Spark DataType
*/
def javaTypeToDataType(clz: Type): DataType
/**
* Convert Hive ObjectInspector to Spark DataType
* @param inspector Hive ObjectInspector
* @return Corresponding Spark DataType
*/
def inspectorToDataType(inspector: ObjectInspector): DataType
/**
* Convert Spark DataType to Hive ObjectInspector
* @param dataType Spark DataType
* @return Corresponding Hive ObjectInspector
*/
def toInspector(dataType: DataType): ObjectInspector
/**
* Convert Spark Expression to Hive ObjectInspector
* @param expr Spark Expression
* @return Corresponding Hive ObjectInspector
*/
def toInspector(expr: Expression): ObjectInspectorUsage Examples:
import org.apache.spark.sql.types._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
// Convert Spark type to Hive ObjectInspector
val stringType = StringType
val stringInspector = toInspector(stringType)
// Convert Hive ObjectInspector back to Spark type
val convertedType = inspectorToDataType(stringInspector)
assert(convertedType == StringType)
// Handle complex types
val structType = StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, true)
))
val structInspector = toInspector(structType)Convert actual data values between Spark and Hive representations.
/**
* Create wrapper function for converting Spark data to Hive representation
* @param oi Hive ObjectInspector for the target type
* @param dataType Spark DataType of the source data
* @return Function that converts Spark values to Hive values
*/
def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
/**
* Create unwrapper function for converting Hive data to Spark representation
* @param objectInspector Hive ObjectInspector for the source type
* @return Function that converts Hive values to Spark values
*/
def unwrapperFor(objectInspector: ObjectInspector): Any => Any
/**
* Create unwrapper function for struct fields
* @param field Hive struct field definition
* @return Function that unwraps struct field values
*/
def unwrapperFor(field: HiveStructField): (Any, InternalRow, Int) => Unit
/**
* Wrap a value from Spark to Hive representation
* @param a Value to wrap
* @param oi Target Hive ObjectInspector
* @param dataType Source Spark DataType
* @return Wrapped value suitable for Hive
*/
def wrap(a: Any, oi: ObjectInspector, dataType: DataType): AnyRefUsage Examples:
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.UTF8String
// Create wrapper for converting Spark strings to Hive
val stringInspector = toInspector(StringType)
val wrapper = wrapperFor(stringInspector, StringType)
// Convert Spark UTF8String to Hive representation
val sparkString = UTF8String.fromString("hello")
val hiveString = wrapper(sparkString)
// Create unwrapper for the reverse conversion
val unwrapper = unwrapperFor(stringInspector)
val backToSpark = unwrapper(hiveString)
// Direct wrapping
val directlyWrapped = wrap(sparkString, stringInspector, StringType)Handle complex nested types like arrays, maps, and structs.
// Array type conversion example
val arrayType = ArrayType(IntegerType)
val arrayInspector = toInspector(arrayType)
val arrayWrapper = wrapperFor(arrayInspector, arrayType)
// Map type conversion example
val mapType = MapType(StringType, IntegerType)
val mapInspector = toInspector(mapType)
val mapWrapper = wrapperFor(mapInspector, mapType)
// Struct type conversion example
val structType = StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, true),
StructField("scores", ArrayType(DoubleType), true)
))
val structInspector = toInspector(structType)
val structWrapper = wrapperFor(structInspector, structType)Usage Example for Complex Types:
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ArrayData
// Convert array data
val arrayType = ArrayType(StringType)
val arrayInspector = toInspector(arrayType)
val arrayWrapper = wrapperFor(arrayInspector, arrayType)
val sparkArray = ArrayData.toArrayData(Array(
UTF8String.fromString("a"),
UTF8String.fromString("b")
))
val hiveArray = arrayWrapper(sparkArray)
// Convert struct data
val structType = StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, true)
))
val structInspector = toInspector(structType)
val structWrapper = wrapperFor(structInspector, structType)
val sparkRow = InternalRow(1, UTF8String.fromString("Alice"))
val hiveStruct = structWrapper(sparkRow)Convenient implicit class for DataType to TypeInfo conversion.
/**
* Implicit class providing convenient type conversion methods
* @param dt Spark DataType
*/
implicit class typeInfoConversions(dt: DataType) {
/**
* Convert Spark DataType to Hive TypeInfo
* @return Corresponding Hive TypeInfo
*/
def toTypeInfo: TypeInfo
}Usage Example:
import org.apache.spark.sql.types._
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
// Use implicit conversion
val sparkType: DataType = StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, true)
))
// Implicit conversion to TypeInfo
val typeInfo: TypeInfo = sparkType.toTypeInfo
// Can also be used inline
def processTypeInfo(ti: TypeInfo): Unit = {
println(s"Processing type: ${ti.getTypeName}")
}
processTypeInfo(IntegerType.toTypeInfo)
processTypeInfo(StringType.toTypeInfo)Standard mappings between Spark and Hive primitive types.
// Spark Type -> Hive TypeInfo mappings
BooleanType // -> BOOLEAN
ByteType // -> TINYINT
ShortType // -> SMALLINT
IntegerType // -> INT
LongType // -> BIGINT
FloatType // -> FLOAT
DoubleType // -> DOUBLE
StringType // -> STRING
BinaryType // -> BINARY
DateType // -> DATE
TimestampType // -> TIMESTAMP
DecimalType // -> DECIMAL(precision, scale)Usage Example:
// Check type compatibility
val supportedTypes = Seq(
BooleanType, ByteType, ShortType, IntegerType,
LongType, FloatType, DoubleType, StringType,
BinaryType, DateType, TimestampType
)
supportedTypes.foreach { sparkType =>
val inspector = toInspector(sparkType)
val backToSpark = inspectorToDataType(inspector)
assert(sparkType == backToSpark, s"Round-trip failed for $sparkType")
}Integration with Hive SerDe (Serializer/Deserializer) system.
// Example of working with SerDe through inspectors
def processSerDeData(
data: Any,
serDe: AbstractSerDe,
inspector: ObjectInspector
): InternalRow = {
// Get deserializer inspector
val deInspector = serDe.getObjectInspector
// Create unwrapper for converting Hive data to Spark
val unwrapper = unwrapperFor(deInspector)
// Convert Hive data to Spark representation
val sparkData = unwrapper(data)
// Convert to InternalRow if needed
sparkData.asInstanceOf[InternalRow]
}Common error patterns and handling in type conversion.
// Type conversion may throw exceptions for unsupported types
try {
val unsupportedType = UserDefinedType.sqlType(new CustomUDT)
val inspector = toInspector(unsupportedType)
} catch {
case _: UnsupportedOperationException =>
println("Type not supported for Hive conversion")
case _: IllegalArgumentException =>
println("Invalid type configuration")
}Optimization tips for data conversion operations.
// Cache wrappers and unwrappers for repeated use
class CachedConverter(schema: StructType) {
private val inspector = toInspector(schema)
private val wrapper = wrapperFor(inspector, schema)
private val unwrapper = unwrapperFor(inspector)
def toHive(row: InternalRow): Any = wrapper(row)
def fromHive(hiveData: Any): Any = unwrapper(hiveData)
}
// Use for batch operations
val converter = new CachedConverter(schema)
val convertedRows = sparkRows.map(converter.toHive)Type conversion in the context of Hive UDF execution.
// Example from HiveSimpleUDF implementation
def evaluateUDF(
udf: GenericUDF,
inputs: Seq[Any],
inputTypes: Seq[DataType],
outputType: DataType
): Any = {
// Convert inputs to Hive representation
val inputInspectors = inputTypes.map(toInspector)
val wrappers = inputInspectors.zip(inputTypes).map {
case (inspector, dataType) => wrapperFor(inspector, dataType)
}
val hiveInputs = inputs.zip(wrappers).map {
case (input, wrapper) => wrapper(input)
}
// Execute UDF
val hiveResult = udf.evaluate(hiveInputs.toArray)
// Convert result back to Spark
val outputInspector = udf.getObjectInspector(inputInspectors.toArray)
val unwrapper = unwrapperFor(outputInspector)
unwrapper(hiveResult)
}Usage Example:
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.spark.sql.types._
// Set up UDF evaluation with proper type conversion
val inputTypes = Seq(StringType, IntegerType)
val inputs = Seq(UTF8String.fromString("test"), 42)
val result = evaluateUDF(myGenericUDF, inputs, inputTypes, StringType)
println(s"UDF result: $result")