0
# Configuration
1
2
Configuration options, utilities, and constants for customizing Hive integration behavior including metastore settings, file format conversion, and compatibility options.
3
4
## Capabilities
5
6
### HiveUtils Configuration Constants
7
8
Core configuration entries for Hive integration behavior.
9
10
```scala { .api }
11
object HiveUtils {
12
/** Built-in Hive version used by Spark */
13
val builtinHiveVersion: String = "1.2.1"
14
15
/** Hive metastore version configuration */
16
val HIVE_METASTORE_VERSION: ConfigEntry[String]
17
18
/** Deprecated Hive version configuration (use HIVE_METASTORE_VERSION instead) */
19
val FAKE_HIVE_VERSION: ConfigEntry[String]
20
21
/** Location of Hive metastore JARs */
22
val HIVE_METASTORE_JARS: ConfigEntry[String]
23
24
/** Enable automatic conversion of Hive Parquet tables */
25
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
26
27
/** Enable schema merging for converted Parquet tables */
28
val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING: ConfigEntry[Boolean]
29
30
/** Enable automatic conversion of Hive ORC tables */
31
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
32
33
/** Enable conversion for CREATE TABLE AS SELECT operations */
34
val CONVERT_METASTORE_CTAS: ConfigEntry[Boolean]
35
36
/** Shared class prefixes between Spark and Hive */
37
val HIVE_METASTORE_SHARED_PREFIXES: ConfigEntry[String]
38
39
/** Class prefixes that create barriers between Spark and Hive */
40
val HIVE_METASTORE_BARRIER_PREFIXES: ConfigEntry[String]
41
42
/** Enable asynchronous Hive Thrift Server */
43
val HIVE_THRIFT_SERVER_ASYNC: ConfigEntry[Boolean]
44
}
45
```
46
47
### Configuration Examples
48
49
**Basic Hive Configuration:**
50
51
```scala
52
import org.apache.spark.sql.SparkSession
53
import org.apache.spark.sql.hive.HiveUtils
54
55
val spark = SparkSession.builder()
56
.appName("Hive Configuration Example")
57
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
58
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.0")
59
.config(HiveUtils.HIVE_METASTORE_JARS.key, "path")
60
.enableHiveSupport()
61
.getOrCreate()
62
```
63
64
**Advanced Configuration:**
65
66
```scala
67
val spark = SparkSession.builder()
68
.appName("Advanced Hive Config")
69
// Metastore configuration
70
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.0")
71
.config(HiveUtils.HIVE_METASTORE_JARS.key, "/opt/hive/lib/*")
72
73
// File format conversion
74
.config(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")
75
.config(HiveUtils.CONVERT_METASTORE_ORC.key, "true")
76
.config(HiveUtils.CONVERT_METASTORE_CTAS.key, "true")
77
.config(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING.key, "false")
78
79
// Class loading configuration
80
.config(HiveUtils.HIVE_METASTORE_SHARED_PREFIXES.key,
81
"com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc")
82
.config(HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
83
"javax.jdo,org.datanucleus")
84
85
// Thrift server configuration
86
.config(HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")
87
88
.enableHiveSupport()
89
.getOrCreate()
90
```
91
92
### Configuration Utilities
93
94
Utility methods for configuration management.
95
96
```scala { .api }
97
object HiveUtils {
98
/**
99
* Configure SparkContext with Hive external catalog support
100
* @param sc - SparkContext to configure
101
* @return Configured SparkContext with Hive catalog
102
*/
103
def withHiveExternalCatalog(sc: SparkContext): SparkContext
104
105
/**
106
* Check if using CLI session state
107
* @return true if CLI session state is active
108
*/
109
def isCliSessionState(): Boolean
110
111
/**
112
* Create temporary configuration for testing
113
* @param useInMemoryDerby - Whether to use in-memory Derby database
114
* @return Configuration map for temporary Hive setup
115
*/
116
def newTemporaryConfiguration(useInMemoryDerby: Boolean): Map[String, String]
117
118
/**
119
* Infer schema for Hive table from metastore information
120
* @param table - Catalog table to infer schema for
121
* @return Table with inferred schema
122
*/
123
def inferSchema(table: CatalogTable): CatalogTable
124
}
125
```
126
127
**Usage Examples:**
128
129
```scala
130
// Configure SparkContext with Hive support
131
val sc = new SparkContext(conf)
132
val hiveEnabledSc = HiveUtils.withHiveExternalCatalog(sc)
133
134
// Create temporary configuration for testing
135
val tempConfig = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)
136
tempConfig.foreach { case (key, value) =>
137
spark.conf.set(key, value)
138
}
139
140
// Check session state
141
if (HiveUtils.isCliSessionState()) {
142
println("Running in CLI mode")
143
}
144
```
145
146
### Metastore Configuration Options
147
148
Detailed configuration options for Hive metastore connectivity.
149
150
**Metastore Version Configuration:**
151
152
```scala
153
// Specify Hive metastore version
154
.config("spark.sql.hive.metastore.version", "2.3.0")
155
156
// Supported versions: 0.12.0, 0.13.0, 0.14.0, 1.0.0, 1.1.0, 1.2.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0
157
```
158
159
**Metastore JAR Configuration:**
160
161
```scala
162
// Option 1: Use specific path
163
.config("spark.sql.hive.metastore.jars", "/opt/hive/lib/*")
164
165
// Option 2: Use builtin (default)
166
.config("spark.sql.hive.metastore.jars", "builtin")
167
168
// Option 3: Use Maven coordinates
169
.config("spark.sql.hive.metastore.jars", "maven")
170
```
171
172
**Database Connection Configuration:**
173
174
```scala
175
// MySQL metastore
176
.config("javax.jdo.option.ConnectionURL",
177
"jdbc:mysql://localhost:3306/hive_metastore")
178
.config("javax.jdo.option.ConnectionDriverName",
179
"com.mysql.jdbc.Driver")
180
.config("javax.jdo.option.ConnectionUserName", "hive")
181
.config("javax.jdo.option.ConnectionPassword", "password")
182
183
// PostgreSQL metastore
184
.config("javax.jdo.option.ConnectionURL",
185
"jdbc:postgresql://localhost:5432/hive_metastore")
186
.config("javax.jdo.option.ConnectionDriverName",
187
"org.postgresql.Driver")
188
```
189
190
### File Format Conversion Configuration
191
192
Options for automatic conversion of Hive tables to optimized formats.
193
194
**Parquet Conversion:**
195
196
```scala
197
// Enable Parquet conversion (default: true)
198
.config("spark.sql.hive.convertMetastoreParquet", "true")
199
200
// Enable schema merging for Parquet (default: false)
201
.config("spark.sql.hive.convertMetastoreParquet.mergeSchema", "false")
202
203
// Example: Query automatically converts Hive Parquet table
204
spark.sql("SELECT * FROM hive_parquet_table").explain()
205
// Shows: HiveTableRelation converted to parquet format
206
```
207
208
**ORC Conversion:**
209
210
```scala
211
// Enable ORC conversion (default: true)
212
.config("spark.sql.hive.convertMetastoreOrc", "true")
213
214
// Example: ORC table automatically uses Spark's native ORC reader
215
spark.sql("SELECT * FROM hive_orc_table").explain()
216
// Shows: Optimized ORC scan
217
```
218
219
**CTAS Conversion:**
220
221
```scala
222
// Enable conversion for CREATE TABLE AS SELECT (default: true)
223
.config("spark.sql.hive.convertMetastoreCtas", "true")
224
225
// Example: CTAS creates optimized table format
226
spark.sql("""
227
CREATE TABLE optimized_table
228
USING PARQUET
229
AS SELECT * FROM source_table
230
""")
231
```
232
233
### Class Loading Configuration
234
235
Configuration for managing class loading between Spark and Hive.
236
237
**Shared Prefixes:**
238
239
```scala
240
// Classes shared between Spark and Hive classloaders
241
.config("spark.sql.hive.metastore.sharedPrefixes",
242
"com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc")
243
```
244
245
**Barrier Prefixes:**
246
247
```scala
248
// Classes that should not be shared (create barriers)
249
.config("spark.sql.hive.metastore.barrierPrefixes",
250
"javax.jdo,org.datanucleus")
251
```
252
253
### HiveOptions for Data Sources
254
255
Configuration class for Hive-specific data source options.
256
257
```scala { .api }
258
/**
259
* Configuration options for Hive data source operations
260
*/
261
class HiveOptions(parameters: Map[String, String]) {
262
/** File format specification (e.g., "textfile", "sequencefile") */
263
val fileFormat: Option[String]
264
265
/** Input format class name */
266
val inputFormat: Option[String]
267
268
/** Output format class name */
269
val outputFormat: Option[String]
270
271
/** SerDe class name */
272
val serde: Option[String]
273
274
/** Check if input/output formats are specified */
275
def hasInputOutputFormat: Boolean
276
277
/** Get SerDe properties */
278
def serdeProperties: Map[String, String]
279
}
280
281
object HiveOptions {
282
// Option key constants
283
val FILE_FORMAT = "fileFormat"
284
val INPUT_FORMAT = "inputFormat"
285
val OUTPUT_FORMAT = "outputFormat"
286
val SERDE = "serde"
287
288
// Delimiter option mappings
289
val delimiterOptions: Map[String, String]
290
291
/**
292
* Get compression configuration for Hive writes
293
*/
294
def getHiveWriteCompression(
295
sessionState: SessionState,
296
hadoopConf: Configuration,
297
compressionCodec: Option[String]
298
): Option[String]
299
}
300
```
301
302
**Usage Examples:**
303
304
```scala
305
// Configure Hive data source options
306
val options = Map(
307
"fileFormat" -> "textfile",
308
"inputFormat" -> "org.apache.hadoop.mapred.TextInputFormat",
309
"outputFormat" -> "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
310
"serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
311
"field.delim" -> "\t",
312
"line.delim" -> "\n"
313
)
314
315
val hiveOptions = new HiveOptions(options)
316
println(s"Using SerDe: ${hiveOptions.serde}")
317
println(s"SerDe properties: ${hiveOptions.serdeProperties}")
318
```
319
320
### Performance Configuration
321
322
Configuration options for optimizing Hive integration performance.
323
324
**Execution Configuration:**
325
326
```scala
327
// Enable vectorized ORC reader
328
.config("spark.sql.orc.impl", "hive")
329
.config("spark.sql.hive.convertMetastoreOrc", "true")
330
331
// Configure Hive execution engine
332
.config("spark.sql.hive.execution.engine", "spark")
333
334
// Memory configuration for Hive operations
335
.config("spark.sql.hive.filesourcePartitionFileCacheSize", "262144000")
336
```
337
338
**Metastore Performance:**
339
340
```scala
341
// Connection pool settings
342
.config("datanucleus.connectionPool.maxPoolSize", "20")
343
.config("datanucleus.connectionPool.minPoolSize", "5")
344
345
// Cache settings
346
.config("datanucleus.cache.level2.type", "none")
347
.config("hive.metastore.cache.pinobjtypes", "Table,Database,Type,FieldSchema,Order")
348
```
349
350
### Environment-Specific Configuration
351
352
Configuration patterns for different deployment environments.
353
354
**Development Configuration:**
355
356
```scala
357
val devSpark = SparkSession.builder()
358
.appName("Development")
359
.master("local[*]")
360
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
361
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "1.2.1")
362
.config(HiveUtils.HIVE_METASTORE_JARS.key, "builtin")
363
.enableHiveSupport()
364
.getOrCreate()
365
```
366
367
**Production Configuration:**
368
369
```scala
370
val prodSpark = SparkSession.builder()
371
.appName("Production")
372
.config("spark.sql.warehouse.dir", "hdfs://cluster/user/hive/warehouse")
373
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.3.0")
374
.config(HiveUtils.HIVE_METASTORE_JARS.key, "/opt/hive/lib/*")
375
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://metastore-db:3306/hive")
376
.config(HiveUtils.CONVERT_METASTORE_PARQUET.key, "true")
377
.config(HiveUtils.CONVERT_METASTORE_ORC.key, "true")
378
.enableHiveSupport()
379
.getOrCreate()
380
```
381
382
## Configuration Validation
383
384
Methods for validating and troubleshooting configuration:
385
386
```scala
387
// Check current configuration
388
val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
389
390
// Display Hive-related configuration
391
spark.conf.getAll.filter(_._1.contains("hive")).foreach {
392
case (key, value) => println(s"$key = $value")
393
}
394
395
// Verify metastore connectivity
396
try {
397
spark.catalog.listDatabases().show()
398
println("Metastore connection successful")
399
} catch {
400
case e: Exception =>
401
println(s"Metastore connection failed: ${e.getMessage}")
402
}
403
404
// Check conversion settings
405
val parquetConversion = spark.conf.get("spark.sql.hive.convertMetastoreParquet")
406
val orcConversion = spark.conf.get("spark.sql.hive.convertMetastoreOrc")
407
println(s"Parquet conversion: $parquetConversion")
408
println(s"ORC conversion: $orcConversion")
409
```