0
# Apache Spark Hive Integration
1
2
Apache Spark Hive integration module that provides seamless integration with Apache Hive data warehouse software, enabling Spark SQL to work with Hive tables, metastore, and SerDes. This module serves as a bridge allowing Spark applications to read from and write to Hive tables, utilize Hive's metastore for table metadata management, execute Hive UDFs within Spark queries, and maintain compatibility with existing Hive-based data pipelines.
3
4
## Package Information
5
6
- **Package Name**: org.apache.spark:spark-hive_2.13
7
- **Package Type**: Maven
8
- **Language**: Scala
9
- **Installation**: Add to your Maven/SBT dependencies: `"org.apache.spark" %% "spark-hive" % "4.0.0"`
10
11
## Core Imports
12
13
```scala
14
import org.apache.spark.sql.hive._
15
import org.apache.spark.sql.hive.client.HiveClient
16
import org.apache.spark.sql.SparkSession
17
```
18
19
## Basic Usage
20
21
```scala
22
import org.apache.spark.sql.SparkSession
23
24
// Create SparkSession with Hive support
25
val spark = SparkSession.builder()
26
.appName("HiveIntegrationExample")
27
.enableHiveSupport()
28
.getOrCreate()
29
30
// Use Hive tables directly
31
spark.sql("CREATE TABLE IF NOT EXISTS hive_table (id INT, name STRING)")
32
spark.sql("INSERT INTO hive_table VALUES (1, 'Alice'), (2, 'Bob')")
33
34
val df = spark.sql("SELECT * FROM hive_table")
35
df.show()
36
37
// Access Hive metastore
38
val catalog = spark.sessionState.catalog
39
catalog.listTables().collect().foreach(println)
40
```
41
42
## Architecture
43
44
The Spark Hive integration is built around several key components:
45
46
- **External Catalog**: `HiveExternalCatalog` provides complete Hive metastore integration for database, table, and partition operations
47
- **Client Abstraction**: `HiveClient` interface abstracts different Hive versions (2.0.x through 4.0.x) with unified API
48
- **Data Conversion**: `HiveInspectors` handles bidirectional conversion between Spark and Hive data representations
49
- **UDF Support**: Comprehensive support for Hive UDFs, UDAFs, (User-Defined Table Functions) with native Spark integration
50
- **Query Planning**: Hive-specific strategies and rules for optimized query execution
51
- **Configuration Management**: Extensive configuration options for metastore connections and format conversions
52
53
## Important Note
54
55
This package is marked as **private internal API** in the Spark codebase. All classes are subject to change between minor releases. However, these APIs are still exposed and used by applications integrating with Spark Hive functionality.
56
57
## Capabilities
58
59
### External Catalog Operations
60
61
Complete Hive metastore integration providing database, table, partition, and function operations through the Spark catalog interface.
62
63
```scala { .api }
64
class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) extends ExternalCatalog {
65
lazy val client: HiveClient
66
67
// Database operations
68
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
69
def getDatabase(db: String): CatalogDatabase
70
def listDatabases(): Seq[String]
71
72
// Table operations
73
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
74
def getTable(db: String, table: String): CatalogTable
75
def listTables(db: String): Seq[String]
76
}
77
```
78
79
[External Catalog](./external-catalog.md)
80
81
### Hive Client Interface
82
83
Low-level interface to Hive metastore client with version abstraction, supporting direct SQL execution and raw Hive operations.
84
85
```scala { .api }
86
trait HiveClient {
87
def version: HiveVersion
88
def runSqlHive(sql: String): Seq[String]
89
90
// Database operations
91
def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit
92
def getDatabase(name: String): CatalogDatabase
93
def listDatabases(pattern: String): Seq[String]
94
95
// Table operations
96
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
97
def getTable(dbName: String, tableName: String): CatalogTable
98
def getRawHiveTable(dbName: String, tableName: String): RawHiveTable
99
}
100
```
101
102
[Hive Client](./hive-client.md)
103
104
### Data Type Conversion
105
106
Bidirectional conversion system between Spark and Hive data representations, handling complex nested types and Hive SerDe integration.
107
108
```scala { .api }
109
trait HiveInspectors {
110
def inspectorToDataType(inspector: ObjectInspector): DataType
111
def toInspector(dataType: DataType): ObjectInspector
112
def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any
113
def unwrapperFor(objectInspector: ObjectInspector): Any => Any
114
115
implicit class typeInfoConversions(dt: DataType) {
116
def toTypeInfo: TypeInfo
117
}
118
}
119
```
120
121
[Data Conversion](./data-conversion.md)
122
123
### User-Defined Function Support
124
125
Complete support for Hive UDFs, UDAFs, and UDTFs with automatic registration and execution within Spark queries.
126
127
```scala { .api }
128
case class HiveSimpleUDF(
129
name: String,
130
funcWrapper: HiveFunctionWrapper,
131
children: Seq[Expression]
132
) extends Expression with HiveInspectors with UserDefinedExpression
133
134
case class HiveUDAFFunction(
135
name: String,
136
funcWrapper: HiveFunctionWrapper,
137
children: Seq[Expression],
138
isUDAFBridgeRequired: Boolean,
139
mutableAggBufferOffset: Int,
140
inputAggBufferOffset: Int
141
) extends TypedImperativeAggregate[HiveUDAFBuffer]
142
```
143
144
[UDF Support](./udf-support.md)
145
146
### Configuration Management
147
148
Comprehensive configuration system for Hive integration with metastore connection settings, format conversions, and JAR management.
149
150
```scala { .api }
151
object HiveUtils {
152
val builtinHiveVersion: String
153
154
// Configuration entries
155
val HIVE_METASTORE_VERSION: ConfigEntry[String]
156
val HIVE_METASTORE_JARS: ConfigEntry[String]
157
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
158
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
159
160
// Client creation
161
def newClientForExecution(conf: SparkConf, hadoopConf: Configuration): HiveClientImpl
162
def newClientForMetadata(conf: SparkConf, hadoopConf: Configuration,
163
configurations: Map[String, String]): HiveClient
164
}
165
```
166
167
[Configuration](./configuration.md)
168
169
## Types
170
171
### Core Types
172
173
```scala { .api }
174
// Hive version representation
175
abstract class HiveVersion(
176
val fullVersion: String,
177
val extraDeps: Seq[String] = Nil,
178
val exclusions: Seq[String] = Nil
179
) extends Ordered[HiveVersion]
180
181
// Raw Hive table interface
182
trait RawHiveTable {
183
def rawTable: Object
184
def toCatalogTable: CatalogTable
185
def hiveTableProps(): Map[String, String]
186
}
187
188
// UDF wrapper
189
case class HiveFunctionWrapper(functionClassName: String)
190
191
// UDAF buffer
192
case class HiveUDAFBuffer(buf: AggregationBuffer, canDoMerge: Boolean)
193
```
194
195
### Supported Hive Versions
196
197
```scala { .api }
198
object hive {
199
case object v2_0 extends HiveVersion("2.0.1")
200
case object v2_1 extends HiveVersion("2.1.1")
201
case object v2_2 extends HiveVersion("2.2.0")
202
case object v2_3 extends HiveVersion("2.3.10")
203
case object v3_0 extends HiveVersion("3.0.0")
204
case object v3_1 extends HiveVersion("3.1.3")
205
case object v4_0 extends HiveVersion("4.0.1")
206
207
val allSupportedHiveVersions: Set[HiveVersion]
208
}
209
```