0
# Core Hive Integration
1
2
This document covers the primary entry points and configuration utilities for Apache Spark Hive integration, including the legacy HiveContext and modern SparkSession approaches.
3
4
## SparkSession with Hive Support (Recommended)
5
6
The modern approach uses SparkSession with Hive support enabled instead of the deprecated HiveContext.
7
8
### Creating a Hive-Enabled SparkSession
9
10
```scala { .api }
11
object SparkSession {
12
def builder(): Builder
13
}
14
15
class Builder {
16
def appName(name: String): Builder
17
def config(key: String, value: String): Builder
18
def config(key: String, value: Long): Builder
19
def config(key: String, value: Double): Builder
20
def config(key: String, value: Boolean): Builder
21
def enableHiveSupport(): Builder
22
def getOrCreate(): SparkSession
23
}
24
```
25
26
**Usage Example:**
27
28
```scala
29
import org.apache.spark.sql.SparkSession
30
31
val spark = SparkSession.builder()
32
.appName("Hive Integration Application")
33
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
34
.config("hive.metastore.uris", "thrift://localhost:9083")
35
.enableHiveSupport()
36
.getOrCreate()
37
38
// Use HiveQL
39
val df = spark.sql("SELECT * FROM my_hive_table WHERE year = 2023")
40
df.show()
41
42
// Access catalog
43
spark.catalog.listTables("default").show()
44
45
// Create tables
46
spark.sql("""
47
CREATE TABLE IF NOT EXISTS employee (
48
id INT,
49
name STRING,
50
department STRING
51
) USING HIVE
52
PARTITIONED BY (year INT)
53
""")
54
```
55
56
## HiveContext (Deprecated)
57
58
**Note**: HiveContext is deprecated as of Spark 2.0.0. Use SparkSession with enableHiveSupport() instead.
59
60
```scala { .api }
61
class HiveContext private[hive](_sparkSession: SparkSession) extends SQLContext(_sparkSession) {
62
def this(sc: SparkContext)
63
def this(sc: JavaSparkContext)
64
65
def newSession(): HiveContext
66
def refreshTable(tableName: String): Unit
67
}
68
```
69
70
**Legacy Usage Example:**
71
72
```scala
73
import org.apache.spark.sql.hive.HiveContext
74
import org.apache.spark.SparkContext
75
76
val sc = new SparkContext()
77
val hiveContext = new HiveContext(sc)
78
79
// Execute HiveQL
80
val results = hiveContext.sql("SELECT COUNT(*) FROM my_table")
81
results.show()
82
83
// Refresh table metadata
84
hiveContext.refreshTable("my_table")
85
86
// Create new session
87
val newSession = hiveContext.newSession()
88
```
89
90
## HiveUtils Configuration
91
92
The HiveUtils object provides configuration constants and utility functions for Hive integration setup.
93
94
```scala { .api }
95
object HiveUtils extends Logging {
96
// Version constants
97
val hiveExecutionVersion: String // "1.2.1"
98
99
// Configuration entries
100
val HIVE_METASTORE_VERSION: ConfigEntry[String]
101
val HIVE_EXECUTION_VERSION: ConfigEntry[String]
102
val HIVE_METASTORE_JARS: ConfigEntry[String]
103
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
104
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
105
val HIVE_METASTORE_SHARED_PREFIXES: ConfigEntry[Seq[String]]
106
val HIVE_METASTORE_BARRIER_PREFIXES: ConfigEntry[Seq[String]]
107
val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]
108
109
// Utility methods
110
def withHiveExternalCatalog(sc: SparkContext): SparkContext
111
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
112
def inferSchema(table: CatalogTable): CatalogTable
113
}
114
```
115
116
### Configuration Properties
117
118
Access configuration constants through HiveUtils:
119
120
```scala
121
import org.apache.spark.sql.hive.HiveUtils
122
123
// Set metastore version
124
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1.1")
125
126
// Configure metastore JARs location
127
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
128
129
// Enable ORC conversion
130
spark.conf.set(HiveUtils.CONVERT_METASTORE_ORC.key, "true")
131
132
// Enable Parquet conversion
133
spark.conf.set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")
134
```
135
136
### Key Configuration Options
137
138
- **HIVE_METASTORE_VERSION**: Specify Hive metastore version for compatibility
139
- Default: Current Spark's built-in version
140
- Example values: "1.2.1", "2.1.1", "2.3.0"
141
142
- **HIVE_METASTORE_JARS**: Location of Hive metastore JARs
143
- "builtin": Use Spark's built-in Hive JARs
144
- "maven": Download JARs from Maven repository
145
- Classpath string: Use JARs from specified classpath
146
147
- **CONVERT_METASTORE_PARQUET**: Use Spark's native Parquet reader
148
- Default: true
149
- When enabled, provides better performance and predicate pushdown
150
151
- **CONVERT_METASTORE_ORC**: Use Spark's native ORC reader
152
- Default: true
153
- Enables vectorized ORC reading and advanced optimizations
154
155
## HiveSessionStateBuilder
156
157
Builder for creating Hive-enabled Spark SQL session state with proper catalog and analyzer setup.
158
159
```scala { .api }
160
class HiveSessionStateBuilder(
161
session: SparkSession,
162
parentState: Option[SessionState] = None
163
) extends BaseSessionStateBuilder(session, parentState) {
164
165
override protected def catalog: HiveSessionCatalog
166
override protected def externalCatalog: ExternalCatalog
167
override protected def analyzer: Analyzer
168
override protected def optimizer: Optimizer
169
override protected def planner: SparkPlanner
170
}
171
```
172
173
This class is typically used internally by Spark when creating Hive-enabled sessions, but can be extended for custom session state requirements.
174
175
## Utility Methods
176
177
### withHiveExternalCatalog
178
179
Configures a SparkContext to use Hive as the external catalog:
180
181
```scala
182
import org.apache.spark.sql.hive.HiveUtils
183
184
val sc = new SparkContext()
185
val hiveEnabledSc = HiveUtils.withHiveExternalCatalog(sc)
186
```
187
188
### newTemporaryConfiguration
189
190
Creates temporary configuration for testing with in-memory Derby database:
191
192
```scala
193
val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
194
// Returns Map[String, String] with Derby-specific Hive settings
195
```
196
197
### inferSchema
198
199
Infers and validates schema information for Hive tables:
200
201
```scala
202
val inferredTable = HiveUtils.inferSchema(catalogTable)
203
// Returns CatalogTable with inferred schema information
204
```
205
206
## Version Compatibility
207
208
The Hive integration supports multiple Hive versions through configurable metastore and execution versions:
209
210
### Supported Hive Versions
211
- **0.12.0** (v12)
212
- **0.13.1** (v13)
213
- **0.14.0** (v14)
214
- **1.0.0** (v1_0)
215
- **1.1.0** (v1_1)
216
- **1.2.1** (v1_2) - Default execution version
217
- **2.0.1** (v2_0)
218
- **2.1.1** (v2_1)
219
220
### Setting Specific Versions
221
222
```scala
223
// Set metastore version different from execution version
224
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1.1")
225
spark.conf.set(HiveUtils.HIVE_EXECUTION_VERSION.key, "1.2.1")
226
227
// Use Maven to download specific version JARs
228
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
229
```
230
231
## Error Handling
232
233
Common errors and their resolutions:
234
235
### NoClassDefFoundError
236
Occurs when Hive JARs are not available:
237
```scala
238
// Solution: Configure metastore JARs
239
spark.conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
240
```
241
242
### Version Incompatibility
243
When metastore and execution versions conflict:
244
```scala
245
// Solution: Set compatible versions
246
spark.conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "1.2.1")
247
```
248
249
### Metastore Connection Issues
250
When unable to connect to Hive metastore:
251
```scala
252
// Solution: Configure metastore URI
253
spark.conf.set("hive.metastore.uris", "thrift://metastore-host:9083")
254
```
255
256
## Migration Guide
257
258
### From HiveContext to SparkSession
259
260
**Before (Deprecated):**
261
```scala
262
val hiveContext = new HiveContext(sc)
263
val df = hiveContext.sql("SELECT * FROM table")
264
```
265
266
**After (Recommended):**
267
```scala
268
val spark = SparkSession.builder()
269
.enableHiveSupport()
270
.getOrCreate()
271
val df = spark.sql("SELECT * FROM table")
272
```
273
274
### Configuration Migration
275
276
HiveContext-specific configurations should be moved to SparkSession:
277
278
**Before:**
279
```scala
280
hiveContext.setConf("hive.exec.dynamic.partition", "true")
281
```
282
283
**After:**
284
```scala
285
spark.conf.set("hive.exec.dynamic.partition", "true")
286
```
287
288
## Types
289
290
```scala { .api }
291
// Configuration entry for Hive settings
292
abstract class ConfigEntry[T] {
293
def key: String
294
def defaultValue: Option[T]
295
def doc: String
296
def valueConverter: String => T
297
def stringConverter: T => String
298
}
299
300
// Session state builder base class
301
abstract class BaseSessionStateBuilder(
302
session: SparkSession,
303
parentState: Option[SessionState]
304
) {
305
protected def catalog: SessionCatalog
306
protected def analyzer: Analyzer
307
protected def optimizer: Optimizer
308
def build(): SessionState
309
}
310
311
// Hive-specific session catalog
312
class HiveSessionCatalog(
313
externalCatalogBuilder: () => ExternalCatalog,
314
globalTempViewManagerBuilder: () => GlobalTempViewManager,
315
functionRegistry: FunctionRegistry,
316
conf: SQLConf,
317
hadoopConf: Configuration,
318
parser: ParserInterface,
319
functionResourceLoader: FunctionResourceLoader
320
) extends SessionCatalog
321
```