0
# Data Type Conversion
1
2
Bidirectional conversion system between Spark and Hive data representations, handling complex nested types and Hive SerDe integration. The `HiveInspectors` trait provides the core functionality for converting data between Spark's internal representation and Hive's object inspection system.
3
4
## Capabilities
5
6
### HiveInspectors Trait
7
8
Core trait providing data conversion between Spark and Hive representations.
9
10
```scala { .api }
11
/**
12
* Converts between Spark and Hive data representations
13
* Handles complex nested types and SerDe integration
14
*/
15
trait HiveInspectors {
16
// Core conversion methods implemented by the trait
17
}
18
```
19
20
### Type Conversion Methods
21
22
Convert between Spark DataTypes and Hive ObjectInspectors/TypeInfo.
23
24
```scala { .api }
25
/**
26
* Convert Java type to Spark DataType
27
* @param clz Java Type to convert
28
* @return Corresponding Spark DataType
29
*/
30
def javaTypeToDataType(clz: Type): DataType
31
32
/**
33
* Convert Hive ObjectInspector to Spark DataType
34
* @param inspector Hive ObjectInspector
35
* @return Corresponding Spark DataType
36
*/
37
def inspectorToDataType(inspector: ObjectInspector): DataType
38
39
/**
40
* Convert Spark DataType to Hive ObjectInspector
41
* @param dataType Spark DataType
42
* @return Corresponding Hive ObjectInspector
43
*/
44
def toInspector(dataType: DataType): ObjectInspector
45
46
/**
47
* Convert Spark Expression to Hive ObjectInspector
48
* @param expr Spark Expression
49
* @return Corresponding Hive ObjectInspector
50
*/
51
def toInspector(expr: Expression): ObjectInspector
52
```
53
54
**Usage Examples:**
55
56
```scala
57
import org.apache.spark.sql.types._
58
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
59
60
// Convert Spark type to Hive ObjectInspector
61
val stringType = StringType
62
val stringInspector = toInspector(stringType)
63
64
// Convert Hive ObjectInspector back to Spark type
65
val convertedType = inspectorToDataType(stringInspector)
66
assert(convertedType == StringType)
67
68
// Handle complex types
69
val structType = StructType(Seq(
70
StructField("id", IntegerType, false),
71
StructField("name", StringType, true)
72
))
73
val structInspector = toInspector(structType)
74
```
75
76
### Data Wrapping and Unwrapping
77
78
Convert actual data values between Spark and Hive representations.
79
80
```scala { .api }
81
/**
82
* Create wrapper function for converting Spark data to Hive representation
83
* @param oi Hive ObjectInspector for the target type
84
* @param dataType Spark DataType of the source data
85
* @return Function that converts Spark values to Hive values
86
*/
87
def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
88
89
/**
90
* Create unwrapper function for converting Hive data to Spark representation
91
* @param objectInspector Hive ObjectInspector for the source type
92
* @return Function that converts Hive values to Spark values
93
*/
94
def unwrapperFor(objectInspector: ObjectInspector): Any => Any
95
96
/**
97
* Create unwrapper function for struct fields
98
* @param field Hive struct field definition
99
* @return Function that unwraps struct field values
100
*/
101
def unwrapperFor(field: HiveStructField): (Any, InternalRow, Int) => Unit
102
103
/**
104
* Wrap a value from Spark to Hive representation
105
* @param a Value to wrap
106
* @param oi Target Hive ObjectInspector
107
* @param dataType Source Spark DataType
108
* @return Wrapped value suitable for Hive
109
*/
110
def wrap(a: Any, oi: ObjectInspector, dataType: DataType): AnyRef
111
```
112
113
**Usage Examples:**
114
115
```scala
116
import org.apache.spark.sql.catalyst.InternalRow
117
import org.apache.spark.unsafe.types.UTF8String
118
119
// Create wrapper for converting Spark strings to Hive
120
val stringInspector = toInspector(StringType)
121
val wrapper = wrapperFor(stringInspector, StringType)
122
123
// Convert Spark UTF8String to Hive representation
124
val sparkString = UTF8String.fromString("hello")
125
val hiveString = wrapper(sparkString)
126
127
// Create unwrapper for the reverse conversion
128
val unwrapper = unwrapperFor(stringInspector)
129
val backToSpark = unwrapper(hiveString)
130
131
// Direct wrapping
132
val directlyWrapped = wrap(sparkString, stringInspector, StringType)
133
```
134
135
### Complex Type Handling
136
137
Handle complex nested types like arrays, maps, and structs.
138
139
```scala { .api }
140
// Array type conversion example
141
val arrayType = ArrayType(IntegerType)
142
val arrayInspector = toInspector(arrayType)
143
val arrayWrapper = wrapperFor(arrayInspector, arrayType)
144
145
// Map type conversion example
146
val mapType = MapType(StringType, IntegerType)
147
val mapInspector = toInspector(mapType)
148
val mapWrapper = wrapperFor(mapInspector, mapType)
149
150
// Struct type conversion example
151
val structType = StructType(Seq(
152
StructField("id", IntegerType, false),
153
StructField("name", StringType, true),
154
StructField("scores", ArrayType(DoubleType), true)
155
))
156
val structInspector = toInspector(structType)
157
val structWrapper = wrapperFor(structInspector, structType)
158
```
159
160
**Usage Example for Complex Types:**
161
162
```scala
163
import org.apache.spark.sql.catalyst.InternalRow
164
import org.apache.spark.sql.catalyst.util.ArrayData
165
166
// Convert array data
167
val arrayType = ArrayType(StringType)
168
val arrayInspector = toInspector(arrayType)
169
val arrayWrapper = wrapperFor(arrayInspector, arrayType)
170
171
val sparkArray = ArrayData.toArrayData(Array(
172
UTF8String.fromString("a"),
173
UTF8String.fromString("b")
174
))
175
val hiveArray = arrayWrapper(sparkArray)
176
177
// Convert struct data
178
val structType = StructType(Seq(
179
StructField("id", IntegerType, false),
180
StructField("name", StringType, true)
181
))
182
val structInspector = toInspector(structType)
183
val structWrapper = wrapperFor(structInspector, structType)
184
185
val sparkRow = InternalRow(1, UTF8String.fromString("Alice"))
186
val hiveStruct = structWrapper(sparkRow)
187
```
188
189
### Implicit Type Conversions
190
191
Convenient implicit class for DataType to TypeInfo conversion.
192
193
```scala { .api }
194
/**
195
* Implicit class providing convenient type conversion methods
196
* @param dt Spark DataType
197
*/
198
implicit class typeInfoConversions(dt: DataType) {
199
/**
200
* Convert Spark DataType to Hive TypeInfo
201
* @return Corresponding Hive TypeInfo
202
*/
203
def toTypeInfo: TypeInfo
204
}
205
```
206
207
**Usage Example:**
208
209
```scala
210
import org.apache.spark.sql.types._
211
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
212
213
// Use implicit conversion
214
val sparkType: DataType = StructType(Seq(
215
StructField("id", IntegerType, false),
216
StructField("name", StringType, true)
217
))
218
219
// Implicit conversion to TypeInfo
220
val typeInfo: TypeInfo = sparkType.toTypeInfo
221
222
// Can also be used inline
223
def processTypeInfo(ti: TypeInfo): Unit = {
224
println(s"Processing type: ${ti.getTypeName}")
225
}
226
227
processTypeInfo(IntegerType.toTypeInfo)
228
processTypeInfo(StringType.toTypeInfo)
229
```
230
231
### Primitive Type Mappings
232
233
Standard mappings between Spark and Hive primitive types.
234
235
```scala { .api }
236
// Spark Type -> Hive TypeInfo mappings
237
BooleanType // -> BOOLEAN
238
ByteType // -> TINYINT
239
ShortType // -> SMALLINT
240
IntegerType // -> INT
241
LongType // -> BIGINT
242
FloatType // -> FLOAT
243
DoubleType // -> DOUBLE
244
StringType // -> STRING
245
BinaryType // -> BINARY
246
DateType // -> DATE
247
TimestampType // -> TIMESTAMP
248
DecimalType // -> DECIMAL(precision, scale)
249
```
250
251
**Usage Example:**
252
253
```scala
254
// Check type compatibility
255
val supportedTypes = Seq(
256
BooleanType, ByteType, ShortType, IntegerType,
257
LongType, FloatType, DoubleType, StringType,
258
BinaryType, DateType, TimestampType
259
)
260
261
supportedTypes.foreach { sparkType =>
262
val inspector = toInspector(sparkType)
263
val backToSpark = inspectorToDataType(inspector)
264
assert(sparkType == backToSpark, s"Round-trip failed for $sparkType")
265
}
266
```
267
268
### SerDe Integration
269
270
Integration with Hive SerDe (Serializer/Deserializer) system.
271
272
```scala { .api }
273
// Example of working with SerDe through inspectors
274
def processSerDeData(
275
data: Any,
276
serDe: AbstractSerDe,
277
inspector: ObjectInspector
278
): InternalRow = {
279
280
// Get deserializer inspector
281
val deInspector = serDe.getObjectInspector
282
283
// Create unwrapper for converting Hive data to Spark
284
val unwrapper = unwrapperFor(deInspector)
285
286
// Convert Hive data to Spark representation
287
val sparkData = unwrapper(data)
288
289
// Convert to InternalRow if needed
290
sparkData.asInstanceOf[InternalRow]
291
}
292
```
293
294
### Error Handling
295
296
Common error patterns and handling in type conversion.
297
298
```scala { .api }
299
// Type conversion may throw exceptions for unsupported types
300
try {
301
val unsupportedType = UserDefinedType.sqlType(new CustomUDT)
302
val inspector = toInspector(unsupportedType)
303
} catch {
304
case _: UnsupportedOperationException =>
305
println("Type not supported for Hive conversion")
306
case _: IllegalArgumentException =>
307
println("Invalid type configuration")
308
}
309
```
310
311
### Performance Considerations
312
313
Optimization tips for data conversion operations.
314
315
```scala { .api }
316
// Cache wrappers and unwrappers for repeated use
317
class CachedConverter(schema: StructType) {
318
private val inspector = toInspector(schema)
319
private val wrapper = wrapperFor(inspector, schema)
320
private val unwrapper = unwrapperFor(inspector)
321
322
def toHive(row: InternalRow): Any = wrapper(row)
323
def fromHive(hiveData: Any): Any = unwrapper(hiveData)
324
}
325
326
// Use for batch operations
327
val converter = new CachedConverter(schema)
328
val convertedRows = sparkRows.map(converter.toHive)
329
```
330
331
### Integration with Hive UDFs
332
333
Type conversion in the context of Hive UDF execution.
334
335
```scala { .api }
336
// Example from HiveSimpleUDF implementation
337
def evaluateUDF(
338
udf: GenericUDF,
339
inputs: Seq[Any],
340
inputTypes: Seq[DataType],
341
outputType: DataType
342
): Any = {
343
344
// Convert inputs to Hive representation
345
val inputInspectors = inputTypes.map(toInspector)
346
val wrappers = inputInspectors.zip(inputTypes).map {
347
case (inspector, dataType) => wrapperFor(inspector, dataType)
348
}
349
350
val hiveInputs = inputs.zip(wrappers).map {
351
case (input, wrapper) => wrapper(input)
352
}
353
354
// Execute UDF
355
val hiveResult = udf.evaluate(hiveInputs.toArray)
356
357
// Convert result back to Spark
358
val outputInspector = udf.getObjectInspector(inputInspectors.toArray)
359
val unwrapper = unwrapperFor(outputInspector)
360
unwrapper(hiveResult)
361
}
362
```
363
364
**Usage Example:**
365
366
```scala
367
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
368
import org.apache.spark.sql.types._
369
370
// Set up UDF evaluation with proper type conversion
371
val inputTypes = Seq(StringType, IntegerType)
372
val inputs = Seq(UTF8String.fromString("test"), 42)
373
374
val result = evaluateUDF(myGenericUDF, inputs, inputTypes, StringType)
375
println(s"UDF result: $result")
376
```