0
# Apache Spark Hive Integration
1
2
Apache Spark SQL Hive Integration provides a comprehensive compatibility layer for running Hive queries and accessing Hive tables through Spark's distributed processing engine. This module enables organizations to leverage Spark's high-performance capabilities while maintaining compatibility with existing Hive-based data warehousing infrastructure.
3
4
## Package Information
5
6
- **Package Name**: spark-hive_2.10
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Group ID**: org.apache.spark
10
- **Artifact ID**: spark-hive_2.10
11
- **Version**: 2.2.3
12
- **Maven Installation**:
13
```xml
14
<dependency>
15
<groupId>org.apache.spark</groupId>
16
<artifactId>spark-hive_2.10</artifactId>
17
<version>2.2.3</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```scala
24
import org.apache.spark.sql.SparkSession
25
import org.apache.spark.sql.hive.HiveContext // Deprecated
26
import org.apache.spark.sql.hive.HiveUtils
27
```
28
29
## Basic Usage
30
31
### Modern Approach (Recommended)
32
33
```scala
34
import org.apache.spark.sql.SparkSession
35
36
// Create SparkSession with Hive support
37
val spark = SparkSession.builder()
38
.appName("Hive Integration Example")
39
.config("hive.metastore.uris", "thrift://localhost:9083")
40
.enableHiveSupport()
41
.getOrCreate()
42
43
// Use HiveQL
44
val df = spark.sql("SELECT * FROM hive_table")
45
df.show()
46
47
// Access Hive tables
48
val table = spark.table("my_database.my_table")
49
table.createOrReplaceTempView("temp_table")
50
```
51
52
### Legacy Approach (Deprecated)
53
54
```scala
55
import org.apache.spark.sql.hive.HiveContext
56
import org.apache.spark.SparkContext
57
58
val sc = new SparkContext()
59
val hiveContext = new HiveContext(sc)
60
61
// Run HiveQL queries
62
val results = hiveContext.sql("SELECT * FROM hive_table")
63
results.show()
64
```
65
66
## Architecture
67
68
The Spark Hive integration is built around several key components:
69
70
- **Hive Compatibility Layer**: Provides seamless integration with existing Hive metastore and table formats
71
- **Query Engine**: Translates HiveQL queries into Spark execution plans using Catalyst optimizer
72
- **File Format Support**: Native support for ORC, Parquet, and other Hive-compatible formats
73
- **UDF Integration**: Execute existing Hive UDFs, UDAFs, and UDTFs within Spark
74
- **Metastore Integration**: Read/write table metadata from Hive metastore with version compatibility
75
76
## Capabilities
77
78
### Core Hive Integration
79
80
Primary entry points and configuration utilities for Hive integration.
81
82
```scala { .api }
83
// Modern entry point (recommended)
84
object SparkSession {
85
def builder(): Builder
86
}
87
88
class Builder {
89
def enableHiveSupport(): Builder
90
}
91
92
// Legacy entry point (deprecated since 2.0.0)
93
class HiveContext(sparkSession: SparkSession) extends SQLContext(sparkSession) {
94
def this(sc: SparkContext)
95
def this(sc: JavaSparkContext)
96
def newSession(): HiveContext
97
def refreshTable(tableName: String): Unit
98
}
99
100
// Configuration utilities
101
object HiveUtils {
102
val hiveExecutionVersion: String
103
val HIVE_METASTORE_VERSION: ConfigEntry[String]
104
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
105
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
106
107
def withHiveExternalCatalog(sc: SparkContext): SparkContext
108
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
109
}
110
```
111
112
[Core Hive Integration](./core-hive-integration.md)
113
114
### File Format Support
115
116
Native support for ORC and Hive-compatible file formats with optimization features.
117
118
```scala { .api }
119
class OrcFileFormat extends FileFormat with DataSourceRegister {
120
def shortName(): String
121
def inferSchema(
122
sparkSession: SparkSession,
123
options: Map[String, String],
124
files: Seq[FileStatus]
125
): Option[StructType]
126
127
def prepareWrite(
128
sparkSession: SparkSession,
129
job: Job,
130
options: Map[String, String],
131
dataSchema: StructType
132
): OutputWriterFactory
133
}
134
135
class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
136
def prepareWrite(
137
sparkSession: SparkSession,
138
job: Job,
139
options: Map[String, String],
140
dataSchema: StructType
141
): OutputWriterFactory
142
}
143
```
144
145
[File Format Support](./file-formats.md)
146
147
### UDF Integration
148
149
Comprehensive support for executing Hive user-defined functions within Spark.
150
151
```scala { .api }
152
// Simple UDFs
153
case class HiveSimpleUDF(
154
name: String,
155
funcWrapper: HiveFunctionWrapper,
156
children: Seq[Expression]
157
) extends Expression {
158
def eval(input: InternalRow): Any
159
def prettyName: String
160
}
161
162
// Generic UDFs
163
case class HiveGenericUDF(
164
name: String,
165
funcWrapper: HiveFunctionWrapper,
166
children: Seq[Expression]
167
) extends Expression
168
169
// Aggregate functions (UDAFs)
170
case class HiveUDAFFunction(
171
name: String,
172
funcWrapper: HiveFunctionWrapper,
173
children: Seq[Expression]
174
) extends TypedImperativeAggregate[AggregationBuffer]
175
176
// Table-generating functions (UDTFs)
177
case class HiveGenericUDTF(
178
name: String,
179
funcWrapper: HiveFunctionWrapper,
180
children: Seq[Expression]
181
) extends Generator
182
```
183
184
[UDF Integration](./udf-integration.md)
185
186
### Metastore Operations
187
188
Interface for interacting with Hive metastore for database, table, and partition management.
189
190
```scala { .api }
191
trait HiveClient {
192
// Version and configuration
193
def version: HiveVersion
194
def getConf(key: String, defaultValue: String): String
195
196
// SQL execution
197
def runSqlHive(sql: String): Seq[String]
198
199
// Database operations
200
def listTables(dbName: String): Seq[String]
201
def setCurrentDatabase(databaseName: String): Unit
202
def getDatabase(name: String): CatalogDatabase
203
def databaseExists(dbName: String): Boolean
204
205
// Table operations
206
def tableExists(dbName: String, tableName: String): Boolean
207
def getTable(dbName: String, tableName: String): CatalogTable
208
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
209
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
210
211
// Partition operations
212
def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit
213
def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec]): Seq[CatalogTablePartition]
214
215
// Function operations
216
def createFunction(db: String, func: CatalogFunction): Unit
217
def listFunctions(db: String, pattern: String): Seq[String]
218
}
219
```
220
221
[Metastore Operations](./metastore-operations.md)
222
223
### Execution Engine
224
225
Physical execution plans and strategies for Hive table operations.
226
227
```scala { .api }
228
case class HiveTableScanExec(
229
requestedAttributes: Seq[Attribute],
230
relation: HiveTableRelation,
231
partitionPruningPred: Seq[Expression]
232
) extends LeafExecNode {
233
def doExecute(): RDD[InternalRow]
234
}
235
236
case class InsertIntoHiveTable(
237
table: CatalogTable,
238
partition: Map[String, Option[String]],
239
query: LogicalPlan,
240
overwrite: Boolean,
241
ifPartitionNotExists: Boolean
242
) extends UnaryCommand {
243
def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
244
}
245
246
case class CreateHiveTableAsSelectCommand(
247
tableDesc: CatalogTable,
248
query: LogicalPlan,
249
ignoreIfExists: Boolean
250
) extends DataWritingCommand
251
```
252
253
[Execution Engine](./execution-engine.md)
254
255
## Configuration
256
257
### Key Hive Configuration Properties
258
259
- **spark.sql.hive.metastore.version**: Hive metastore version (default: 1.2.1)
260
- **spark.sql.hive.metastore.jars**: Location of Hive metastore JARs ("builtin", "maven", or classpath)
261
- **spark.sql.hive.convertMetastoreParquet**: Use Spark's Parquet reader for Hive tables (default: true)
262
- **spark.sql.hive.convertMetastoreOrc**: Use Spark's ORC reader for Hive tables (default: true)
263
264
### Supported Hive Versions
265
266
- Hive 0.12.0 through 2.1.1
267
- Default execution version: 1.2.1
268
- Configurable metastore version for compatibility
269
270
## Migration from HiveContext
271
272
The `HiveContext` class is deprecated as of Spark 2.0.0. Migration steps:
273
274
1. Replace `HiveContext` with `SparkSession.builder().enableHiveSupport()`
275
2. Update configuration from Hive-specific settings to Spark SQL settings
276
3. Use `spark.sql()` instead of `hiveContext.sql()` for queries
277
4. Access catalog through `spark.catalog` instead of direct metastore calls
278
279
## Error Handling
280
281
Common exceptions and error patterns:
282
283
- **AnalysisException**: Thrown for invalid table references or schema mismatches
284
- **HiveException**: Wrapper for underlying Hive metastore errors
285
- **UnsupportedOperationException**: For unsupported Hive features or version incompatibilities
286
287
## Types
288
289
```scala { .api }
290
// Configuration entries
291
abstract class ConfigEntry[T] {
292
def key: String
293
def defaultValue: Option[T]
294
def doc: String
295
}
296
297
// Hive version representation
298
abstract class HiveVersion {
299
def fullVersion: String
300
def extraDeps: Seq[String]
301
def exclusions: Seq[String]
302
}
303
304
// Function wrapper for Hive UDFs
305
case class HiveFunctionWrapper(
306
className: String,
307
instance: AnyRef
308
)
309
310
// Table partition specification
311
type TablePartitionSpec = Map[String, String]
312
313
// Catalog types (from Spark SQL)
314
case class CatalogTable(
315
identifier: TableIdentifier,
316
tableType: CatalogTableType,
317
storage: CatalogStorageFormat,
318
schema: StructType,
319
partitionColumnNames: Seq[String] = Seq.empty,
320
bucketSpec: Option[BucketSpec] = None
321
)
322
323
case class CatalogTablePartition(
324
spec: TablePartitionSpec,
325
storage: CatalogStorageFormat,
326
parameters: Map[String, String] = Map.empty
327
)
328
329
case class CatalogDatabase(
330
name: String,
331
description: String,
332
locationUri: String,
333
properties: Map[String, String]
334
)
335
336
case class CatalogFunction(
337
identifier: FunctionIdentifier,
338
className: String,
339
resources: Seq[FunctionResource]
340
)
341
```