or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddata-type-conversion.mdfile-formats.mdindex.mdmetastore-operations.mdsession-management.mdudf-integration.md
tile.json

data-type-conversion.mddocs/

Data Type Conversion

Utilities for converting between Hive and Catalyst data types, handling ObjectInspectors, and managing SerDe operations. This enables seamless data exchange between Hive and Spark SQL systems.

Capabilities

HiveInspectors Trait

Core trait providing conversion utilities between Hive ObjectInspectors and Catalyst data types.

/**
 * Trait for converting between Hive and Catalyst data formats
 * Handles ObjectInspector transformations and type mappings
 */
trait HiveInspectors {
  
  /**
   * Convert Java reflection Type to Catalyst DataType
   * @param clz - Java Type to convert
   * @return Corresponding Catalyst DataType
   */
  def javaTypeToDataType(clz: Type): DataType
  
  /**
   * Create wrapper function for converting Catalyst data to Hive format
   * @param oi - Hive ObjectInspector defining target format
   * @param dataType - Source Catalyst DataType  
   * @return Function to convert Any value from Catalyst to Hive format
   */
  def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
  
  /**
   * Create unwrapper function for converting Hive data to Catalyst format
   * @param objectInspector - Hive ObjectInspector defining source format
   * @return Function to convert Any value from Hive to Catalyst format
   */
  def unwrapperFor(objectInspector: ObjectInspector): Any => Any
  
  /**
   * Convert Catalyst DataType to Hive ObjectInspector
   * @param dataType - Catalyst DataType to convert
   * @return Corresponding Hive ObjectInspector
   */
  def toInspector(dataType: DataType): ObjectInspector
  
  /**
   * Convert Hive ObjectInspector to Catalyst DataType
   * @param inspector - Hive ObjectInspector to convert
   * @return Corresponding Catalyst DataType
   */
  def inspectorToDataType(inspector: ObjectInspector): DataType
}

Basic Type Conversions

Conversions between basic Hive and Catalyst data types.

Primitive Type Mappings:

// String types
StringType <-> PrimitiveObjectInspectorFactory.javaStringObjectInspector

// Numeric types  
IntegerType <-> PrimitiveObjectInspectorFactory.javaIntObjectInspector
LongType <-> PrimitiveObjectInspectorFactory.javaLongObjectInspector
DoubleType <-> PrimitiveObjectInspectorFactory.javaDoubleObjectInspector
FloatType <-> PrimitiveObjectInspectorFactory.javaFloatObjectInspector
BooleanType <-> PrimitiveObjectInspectorFactory.javaBooleanObjectInspector

// Binary types
BinaryType <-> PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector

// Date/Time types  
DateType <-> PrimitiveObjectInspectorFactory.javaDateObjectInspector
TimestampType <-> PrimitiveObjectInspectorFactory.javaTimestampObjectInspector

// Decimal types
DecimalType <-> PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector

Usage Examples:

import org.apache.spark.sql.hive.HiveInspectors
import org.apache.spark.sql.types._

// Example implementation using HiveInspectors
class MyHiveProcessor extends HiveInspectors {
  
  // Convert Catalyst schema to Hive ObjectInspector
  def createHiveInspector(schema: StructType): ObjectInspector = {
    toInspector(schema)
  }
  
  // Convert Hive result to Catalyst format
  def convertFromHive(value: Any, inspector: ObjectInspector): Any = {
    val unwrapper = unwrapperFor(inspector)
    unwrapper(value)
  }
  
  // Convert Catalyst data to Hive format
  def convertToHive(value: Any, dataType: DataType, inspector: ObjectInspector): Any = {
    val wrapper = wrapperFor(inspector, dataType)  
    wrapper(value)
  }
}

Complex Type Conversions

Handling complex data types like arrays, maps, and structs.

Array Type Conversion:

import org.apache.spark.sql.types._
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive._

// Array of strings: ArrayType(StringType) <-> ListObjectInspector
val arrayType = ArrayType(StringType, containsNull = true)
val arrayInspector = toInspector(arrayType)

// Usage example
val catalystArray = Array("hello", "world", null)
val hiveArray = wrapperFor(arrayInspector, arrayType)(catalystArray)

// Convert back  
val convertedBack = unwrapperFor(arrayInspector)(hiveArray)

Map Type Conversion:

// Map type: MapType(StringType, IntegerType) <-> MapObjectInspector  
val mapType = MapType(StringType, IntegerType, valueContainsNull = true)
val mapInspector = toInspector(mapType)

// Usage example
val catalystMap = Map("key1" -> 1, "key2" -> 2, "key3" -> null)
val hiveMap = wrapperFor(mapInspector, mapType)(catalystMap)

Struct Type Conversion:

// Struct type: StructType <-> StructObjectInspector
val structType = StructType(Seq(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = true),
  StructField("active", BooleanType, nullable = false)
))
val structInspector = toInspector(structType)

// Usage example  
import org.apache.spark.sql.catalyst.InternalRow
val catalystRow = InternalRow.fromSeq(Seq("Alice", 25, true))
val hiveStruct = wrapperFor(structInspector, structType)(catalystRow)

Hive SerDe Integration

Integration with Hive Serializer/Deserializer (SerDe) classes.

Working with LazySimpleSerDe:

import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import java.util.Properties

// Configure SerDe properties
val serdeProps = new Properties()
serdeProps.setProperty("field.delim", "\t")
serdeProps.setProperty("line.delim", "\n")
serdeProps.setProperty("serialization.format", "\t")

// Create SerDe instance
val serde = new LazySimpleSerDe()
serde.initialize(null, serdeProps)

// Get ObjectInspector from SerDe
val serdeInspector = serde.getObjectInspector()

// Convert to Catalyst type
val catalystType = inspectorToDataType(serdeInspector)

Working with JSON SerDe:

import org.apache.hive.hcatalog.data.JsonSerDe

val jsonSerdeProps = new Properties()
jsonSerdeProps.setProperty("serialization.format", "1")

val jsonSerde = new JsonSerDe()
jsonSerde.initialize(null, jsonSerdeProps)

// Handle JSON data conversion
val jsonInspector = jsonSerde.getObjectInspector()
val jsonDataType = inspectorToDataType(jsonInspector)

Type Inference and Schema Evolution

Utilities for schema inference and evolution.

object HiveUtils {
  /**
   * Infer schema for Hive table from metastore information
   * @param table - CatalogTable to infer schema for
   * @return Table with inferred schema
   */
  def inferSchema(table: CatalogTable): CatalogTable
}

Schema Inference Examples:

import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.hive.HiveUtils

// Get table from metastore
val client: HiveClient = // obtain HiveClient instance
val table = client.getTable("my_db", "my_table")

// Infer schema from Hive metadata
val tableWithSchema = HiveUtils.inferSchema(table)
println(s"Inferred schema: ${tableWithSchema.schema}")

// Handle schema evolution
val originalSchema = tableWithSchema.schema
val newField = StructField("new_column", StringType, nullable = true)
val evolvedSchema = StructType(originalSchema.fields :+ newField)

Advanced Type Handling

Handling special Hive types and edge cases.

Decimal Type Handling:

import org.apache.spark.sql.types.DecimalType
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable

// Precise decimal conversion
val preciseDecimal = DecimalType(18, 2)  // precision=18, scale=2
val decimalInspector = toInspector(preciseDecimal)

// Convert Catalyst Decimal to Hive
val catalystDecimal = Decimal("123456789.99")  
val hiveDecimal = wrapperFor(decimalInspector, preciseDecimal)(catalystDecimal)

// Convert back
val convertedDecimal = unwrapperFor(decimalInspector)(hiveDecimal)

Date/Time Type Handling:

import java.sql.{Date, Timestamp}
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}

// Date conversion
val dateType = DateType
val dateInspector = toInspector(dateType)

val catalystDate = Date.valueOf("2023-12-25")
val hiveDateWritable = wrapperFor(dateInspector, dateType)(catalystDate)

// Timestamp conversion  
val timestampType = TimestampType
val timestampInspector = toInspector(timestampType)

val catalystTimestamp = Timestamp.valueOf("2023-12-25 10:30:00")
val hiveTimestampWritable = wrapperFor(timestampInspector, timestampType)(catalystTimestamp)

Binary Data Handling:

// Binary type conversion
val binaryType = BinaryType
val binaryInspector = toInspector(binaryType)

val catalystBinary = "Hello World".getBytes("UTF-8")
val hiveBinary = wrapperFor(binaryInspector, binaryType)(catalystBinary)

Performance Optimization

Optimizations for type conversion performance.

Reusing Inspectors:

class OptimizedConverter extends HiveInspectors {
  // Cache frequently used inspectors
  private val commonInspectors = Map(
    StringType -> toInspector(StringType),
    IntegerType -> toInspector(IntegerType),
    LongType -> toInspector(LongType),
    DoubleType -> toInspector(DoubleType)
  )
  
  def getInspector(dataType: DataType): ObjectInspector = {
    commonInspectors.getOrElse(dataType, toInspector(dataType))
  }
  
  // Cache wrapper functions for hot paths
  private val wrapperCache = collection.mutable.Map[(ObjectInspector, DataType), Any => Any]()
  
  def getCachedWrapper(oi: ObjectInspector, dataType: DataType): Any => Any = {
    wrapperCache.getOrElseUpdate((oi, dataType), wrapperFor(oi, dataType))
  }
}

Batch Conversion:

def convertBatch(
  data: Array[Any], 
  sourceInspector: ObjectInspector,
  targetType: DataType
): Array[Any] = {
  
  val converter = unwrapperFor(sourceInspector)
  data.map(converter)
}

// Example usage
val hiveResults: Array[Any] = // ... from Hive query
val catalystResults = convertBatch(hiveResults, resultInspector, targetSchema)

Error Handling and Debugging

Common patterns for handling type conversion errors.

import org.apache.spark.sql.catalyst.util.DateTimeUtils

def safeTypeConversion[T](
  value: Any,
  converter: Any => Any,
  fallback: T
): T = {
  try {
    converter(value).asInstanceOf[T]
  } catch {
    case _: ClassCastException => 
      println(s"Type conversion failed for value: $value")
      fallback
    case _: NumberFormatException =>
      println(s"Number format error for value: $value") 
      fallback
    case e: Exception =>
      println(s"Unexpected conversion error: ${e.getMessage}")
      fallback
  }
}

// Usage
val result = safeTypeConversion(
  hiveValue,
  unwrapperFor(inspector),
  null
)

Type Compatibility Checking:

def isTypeCompatible(hiveInspector: ObjectInspector, catalystType: DataType): Boolean = {
  try {
    val convertedType = inspectorToDataType(hiveInspector)
    // Check if types are compatible (allowing for widening conversions)
    (convertedType, catalystType) match {
      case (IntegerType, LongType) => true  // Int can be widened to Long
      case (FloatType, DoubleType) => true  // Float can be widened to Double
      case (a, b) => a == b
    }
  } catch {
    case _: Exception => false
  }
}

Integration Examples

Complete examples showing type conversion in context.

UDF Type Integration:

import org.apache.spark.sql.hive.HiveInspectors

class MyCustomUDF extends HiveInspectors {
  def evaluate(input: Any, inputInspector: ObjectInspector): Any = {
    // Convert from Hive to Catalyst
    val catalystValue = unwrapperFor(inputInspector)(input)
    
    // Process in Catalyst format
    val result = processValue(catalystValue)
    
    // Convert back to Hive format
    val outputInspector = toInspector(StringType)
    wrapperFor(outputInspector, StringType)(result)
  }
  
  private def processValue(value: Any): String = {
    // Custom processing logic
    value.toString.toUpperCase
  }
}

Table Scan Integration:

def convertHiveTableScan(
  hiveResults: Iterator[Any],
  tableSchema: StructType,
  hiveInspector: StructObjectInspector
): Iterator[InternalRow] = {
  
  val unwrapper = unwrapperFor(hiveInspector)
  
  hiveResults.map { hiveRow =>
    val catalystRow = unwrapper(hiveRow)
    catalystRow.asInstanceOf[InternalRow]
  }
}