0
# Configuration Management
1
2
Comprehensive configuration system for Hive integration with metastore connection settings, format conversions, and JAR management. The `HiveUtils` object provides centralized configuration management and client factory methods.
3
4
## Capabilities
5
6
### HiveUtils Object
7
8
Central configuration and utility object for Hive integration.
9
10
```scala { .api }
11
/**
12
* Central configuration and utility object for Hive integration
13
* Provides configuration constants and client factory methods
14
*/
15
object HiveUtils {
16
/**
17
* Built-in Hive version used by Spark
18
*/
19
val builtinHiveVersion: String
20
}
21
```
22
23
### Core Configuration Entries
24
25
Essential configuration options for Hive integration.
26
27
```scala { .api }
28
/**
29
* Built-in Hive version configuration
30
*/
31
val BUILTIN_HIVE_VERSION: ConfigEntry[String]
32
33
/**
34
* Hive metastore version to use
35
* Default: matches builtin version
36
*/
37
val HIVE_METASTORE_VERSION: ConfigEntry[String]
38
39
/**
40
* Hive metastore JAR location strategy
41
* Options: "builtin", "maven", or custom path
42
*/
43
val HIVE_METASTORE_JARS: ConfigEntry[String]
44
45
/**
46
* Custom paths for Hive metastore JARs
47
* Used when HIVE_METASTORE_JARS is set to custom paths
48
*/
49
val HIVE_METASTORE_JARS_PATH: ConfigEntry[Seq[String]]
50
```
51
52
**Usage Example:**
53
54
```scala
55
import org.apache.spark.SparkConf
56
57
val conf = new SparkConf()
58
59
// Use built-in Hive JARs
60
conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "builtin")
61
62
// Or use Maven to download specific version
63
conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
64
conf.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.10")
65
66
// Or specify custom JAR paths
67
conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "path")
68
conf.set(HiveUtils.HIVE_METASTORE_JARS_PATH.key, "/path/to/hive/lib/*")
69
```
70
71
### Format Conversion Configuration
72
73
Control automatic conversion between Hive and Spark native formats.
74
75
```scala { .api }
76
/**
77
* Enable automatic conversion of Hive SerDe Parquet tables to Spark native format
78
* Default: true
79
*/
80
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
81
82
/**
83
* Enable schema merging when converting Parquet tables
84
* Default: false
85
*/
86
val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING: ConfigEntry[Boolean]
87
88
/**
89
* Enable automatic conversion of Hive SerDe ORC tables to Spark native format
90
* Default: true
91
*/
92
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
93
94
/**
95
* Enable conversion for INSERT operations on partitioned tables
96
* Default: true
97
*/
98
val CONVERT_INSERTING_PARTITIONED_TABLE: ConfigEntry[Boolean]
99
100
/**
101
* Enable conversion for INSERT operations on unpartitioned tables
102
* Default: true
103
*/
104
val CONVERT_INSERTING_UNPARTITIONED_TABLE: ConfigEntry[Boolean]
105
106
/**
107
* Enable conversion for CREATE TABLE AS SELECT operations
108
* Default: true
109
*/
110
val CONVERT_METASTORE_CTAS: ConfigEntry[Boolean]
111
112
/**
113
* Enable conversion for INSERT DIRECTORY operations
114
* Default: true
115
*/
116
val CONVERT_METASTORE_INSERT_DIR: ConfigEntry[Boolean]
117
```
118
119
**Usage Example:**
120
121
```scala
122
val conf = new SparkConf()
123
124
// Disable Parquet conversion for compatibility
125
conf.set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "false")
126
127
// Enable schema merging for Parquet
128
conf.set(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING.key, "true")
129
130
// Disable ORC conversion
131
conf.set(HiveUtils.CONVERT_METASTORE_ORC.key, "false")
132
133
// Fine-tune insertion behavior
134
conf.set(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE.key, "false")
135
conf.set(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE.key, "true")
136
```
137
138
### Classloader Configuration
139
140
Control classloader isolation for Hive integration.
141
142
```scala { .api }
143
/**
144
* Class prefixes that should be shared between Spark and Hive classloaders
145
* Default: Java standard library, Hadoop, Hive API classes
146
*/
147
val HIVE_METASTORE_SHARED_PREFIXES: ConfigEntry[Seq[String]]
148
149
/**
150
* Class prefixes that should be isolated in Hive classloader
151
* Default: Hive implementation classes
152
*/
153
val HIVE_METASTORE_BARRIER_PREFIXES: ConfigEntry[Seq[String]]
154
```
155
156
**Usage Example:**
157
158
```scala
159
val conf = new SparkConf()
160
161
// Add custom shared prefixes
162
val sharedPrefixes = Seq(
163
"java.",
164
"javax.",
165
"org.apache.hadoop.",
166
"com.mycompany.shared."
167
)
168
conf.set(HiveUtils.HIVE_METASTORE_SHARED_PREFIXES.key, sharedPrefixes.mkString(","))
169
170
// Add custom barrier prefixes
171
val barrierPrefixes = Seq(
172
"org.apache.hive.",
173
"com.mycompany.hive."
174
)
175
conf.set(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key, barrierPrefixes.mkString(","))
176
```
177
178
### Thrift Server Configuration
179
180
Configuration for Hive Thrift Server integration.
181
182
```scala { .api }
183
/**
184
* Enable async processing in Hive Thrift Server
185
* Default: true
186
*/
187
val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]
188
```
189
190
**Usage Example:**
191
192
```scala
193
val conf = new SparkConf()
194
195
// Disable async processing for debugging
196
conf.set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")
197
```
198
199
### Client Factory Methods
200
201
Create Hive clients for different use cases.
202
203
```scala { .api }
204
/**
205
* Create Hive client for SQL execution context
206
* @param conf Spark configuration
207
* @param hadoopConf Hadoop configuration
208
* @return HiveClientImpl instance for execution
209
*/
210
def newClientForExecution(conf: SparkConf, hadoopConf: Configuration): HiveClientImpl
211
212
/**
213
* Create Hive client for metadata operations
214
* @param conf Spark configuration
215
* @param hadoopConf Hadoop configuration
216
* @param configurations Additional Hive configurations
217
* @return HiveClient instance for metadata operations
218
*/
219
def newClientForMetadata(
220
conf: SparkConf,
221
hadoopConf: Configuration,
222
configurations: Map[String, String]
223
): HiveClient
224
```
225
226
**Usage Example:**
227
228
```scala
229
import org.apache.spark.SparkConf
230
import org.apache.hadoop.conf.Configuration
231
232
val sparkConf = new SparkConf()
233
val hadoopConf = new Configuration()
234
235
// Client for executing Hive SQL
236
val executionClient = HiveUtils.newClientForExecution(sparkConf, hadoopConf)
237
val results = executionClient.runSqlHive("SHOW DATABASES")
238
239
// Client for metadata operations with custom settings
240
val metadataConf = Map(
241
"hive.metastore.uris" -> "thrift://metastore:9083",
242
"hive.metastore.connect.retries" -> "3"
243
)
244
val metadataClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf, metadataConf)
245
val databases = metadataClient.listDatabases("*")
246
```
247
248
### Utility Methods
249
250
Helper methods for common operations.
251
252
```scala { .api }
253
/**
254
* Check if using CLI session state
255
* @return True if CLI session state is active
256
*/
257
def isCliSessionState(): Boolean
258
259
/**
260
* Create temporary configuration for testing
261
* @param useInMemoryDerby Whether to use in-memory Derby database
262
* @return Map of temporary configuration settings
263
*/
264
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
265
266
/**
267
* Infer schema for a Hive table
268
* @param table Catalog table definition
269
* @return Table with inferred schema
270
*/
271
def inferSchema(table: CatalogTable): CatalogTable
272
273
/**
274
* Parse partition name into values
275
* @param name Partition name (e.g., "year=2023/month=01")
276
* @return Array of partition values
277
*/
278
def partitionNameToValues(name: String): Array[String]
279
```
280
281
**Usage Examples:**
282
283
```scala
284
// Check session state
285
if (HiveUtils.isCliSessionState()) {
286
println("Running in CLI mode")
287
}
288
289
// Create temporary configuration for testing
290
val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
291
tempConfig.foreach { case (key, value) =>
292
println(s"$key = $value")
293
}
294
295
// Infer schema for table
296
val tableWithInferredSchema = HiveUtils.inferSchema(catalogTable)
297
println(s"Inferred schema: ${tableWithInferredSchema.schema}")
298
299
// Parse partition values
300
val partitionValues = HiveUtils.partitionNameToValues("year=2023/month=01/day=15")
301
// Result: Array("2023", "01", "15")
302
```
303
304
### Configuration Best Practices
305
306
Recommended configuration patterns for different scenarios.
307
308
```scala { .api }
309
// Production configuration
310
def productionHiveConfig(): SparkConf = {
311
new SparkConf()
312
.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
313
.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.10")
314
.set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")
315
.set(HiveUtils.CONVERT_METASTORE_ORC.key, "true")
316
.set(HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")
317
}
318
319
// Development configuration with custom JARs
320
def developmentHiveConfig(hivePath: String): SparkConf = {
321
new SparkConf()
322
.set(HiveUtils.HIVE_METASTORE_JARS.key, "path")
323
.set(HiveUtils.HIVE_METASTORE_JARS_PATH.key, s"$hivePath/lib/*")
324
.set(HiveUtils.CONVERT_METASTORE_PARQUET.key, "false") // For debugging
325
.set(HiveUtils.CONVERT_METASTORE_ORC.key, "false")
326
}
327
328
// Testing configuration with in-memory database
329
def testingHiveConfig(): SparkConf = {
330
val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
331
val conf = new SparkConf()
332
333
tempConfig.foreach { case (key, value) =>
334
conf.set(key, value)
335
}
336
337
conf.set(HiveUtils.HIVE_METASTORE_JARS.key, "builtin")
338
}
339
```
340
341
**Usage Example:**
342
343
```scala
344
import org.apache.spark.sql.SparkSession
345
346
// Production setup
347
val prodConf = productionHiveConfig()
348
val prodSpark = SparkSession.builder()
349
.config(prodConf)
350
.enableHiveSupport()
351
.getOrCreate()
352
353
// Development setup
354
val devConf = developmentHiveConfig("/opt/hive")
355
val devSpark = SparkSession.builder()
356
.config(devConf)
357
.enableHiveSupport()
358
.getOrCreate()
359
360
// Testing setup
361
val testConf = testingHiveConfig()
362
val testSpark = SparkSession.builder()
363
.config(testConf)
364
.enableHiveSupport()
365
.getOrCreate()
366
```
367
368
### Advanced Configuration
369
370
Advanced configuration scenarios and troubleshooting.
371
372
```scala { .api }
373
// Handle version conflicts
374
def resolveHiveVersionConflict(
375
sparkConf: SparkConf,
376
targetHiveVersion: String
377
): SparkConf = {
378
379
sparkConf
380
.set(HiveUtils.HIVE_METASTORE_VERSION.key, targetHiveVersion)
381
.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
382
// Add version-specific exclusions
383
.set("spark.sql.hive.metastore.jars.scope", "compile")
384
}
385
386
// Custom SerDe configuration
387
def configureCustomSerDe(
388
sparkConf: SparkConf,
389
serDeJars: Seq[String]
390
): SparkConf = {
391
392
val currentJars = sparkConf.get(HiveUtils.HIVE_METASTORE_JARS_PATH.key, Seq.empty)
393
val allJars = currentJars ++ serDeJars
394
395
sparkConf
396
.set(HiveUtils.HIVE_METASTORE_JARS.key, "path")
397
.set(HiveUtils.HIVE_METASTORE_JARS_PATH.key, allJars.mkString(","))
398
}
399
```
400
401
**Usage Example:**
402
403
```scala
404
// Resolve version conflicts
405
val conflictResolvedConf = resolveHiveVersionConflict(sparkConf, "3.1.3")
406
407
// Add custom SerDe JARs
408
val customSerDeJars = Seq(
409
"/path/to/custom-serde.jar",
410
"/path/to/another-serde.jar"
411
)
412
val serDeConf = configureCustomSerDe(sparkConf, customSerDeJars)
413
414
// Create SparkSession with resolved configuration
415
val spark = SparkSession.builder()
416
.config(serDeConf)
417
.enableHiveSupport()
418
.getOrCreate()
419
```
420
421
### Monitoring and Debugging
422
423
Configuration for monitoring Hive integration.
424
425
```scala { .api }
426
// Enable debug logging for Hive operations
427
def enableHiveDebugLogging(sparkConf: SparkConf): SparkConf = {
428
sparkConf
429
.set("spark.sql.debug.maxToStringFields", "1000")
430
.set("spark.sql.adaptive.enabled", "false") // For consistent debugging
431
}
432
433
// Monitor metastore connection health
434
def checkMetastoreHealth(client: HiveClient): Boolean = {
435
try {
436
client.listDatabases("*")
437
true
438
} catch {
439
case _: Exception => false
440
}
441
}
442
```
443
444
**Usage Example:**
445
446
```scala
447
// Enable debugging
448
val debugConf = enableHiveDebugLogging(sparkConf)
449
450
val spark = SparkSession.builder()
451
.config(debugConf)
452
.enableHiveSupport()
453
.getOrCreate()
454
455
// Check metastore health
456
val client = HiveUtils.newClientForMetadata(sparkConf, hadoopConf, Map.empty)
457
if (checkMetastoreHealth(client)) {
458
println("Metastore connection is healthy")
459
} else {
460
println("Metastore connection failed")
461
}
462
```