Core session management and configuration for enabling Hive support in Spark SQL sessions.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveSessionStateBuilder
import org.apache.spark.sql.hive.HiveSessionCatalog
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.HiveContextThe primary method to enable Hive integration in Spark SQL sessions.
/**
* Enable Hive support in SparkSession, allowing access to Hive tables and metastore
* @returns Builder instance with Hive support enabled
*/
def enableHiveSupport(): BuilderUsage Example:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Hive Integration")
.enableHiveSupport()
.getOrCreate()
// Now you can access Hive tables
spark.sql("SHOW TABLES").show()Key configuration options for Hive integration behavior.
/**
* Set configuration property for Hive integration
* @param key Configuration key
* @param value Configuration value
* @returns Builder instance with configuration set
*/
def config(key: String, value: String): BuilderKey Configuration Properties:
// Use Hive as catalog implementation
spark.config("spark.sql.catalogImplementation", "hive")
// Convert Parquet tables from Hive metastore
spark.config("spark.sql.hive.convertMetastoreParquet", "true")
// Convert ORC tables from Hive metastore
spark.config("spark.sql.hive.convertMetastoreOrc", "true")
// Enable conversion for partitioned table inserts
spark.config("spark.sql.hive.convertInsertingPartitionedTable", "true")
// Hive metastore warehouse directory
spark.config("spark.sql.warehouse.dir", "/user/hive/warehouse")Note: This API is deprecated since Spark 2.0.0. Use SparkSession.builder().enableHiveSupport() instead.
@deprecated("Use SparkSession.builder.enableHiveSupport instead", "2.0.0")
class HiveContext private[hive](sparkSession: SparkSession) extends SQLContext(sparkSession) {
/**
* Create new HiveContext instance
* @param sc SparkContext instance
*/
def this(sc: SparkContext)
/**
* Invalidate and refresh cached metadata for the given table
* @param tableName Name of table to refresh
*/
def refreshTable(tableName: String): Unit
/**
* Create new HiveContext session with separated SQLConf and temporary tables
* @returns New HiveContext instance
*/
override def newSession(): HiveContext
}object HiveUtils {
/** Convert metastore Parquet tables to use Spark's native Parquet reader */
val CONVERT_METASTORE_PARQUET: ConfigEntry[Boolean]
/** Convert metastore ORC tables to use Spark's native ORC reader */
val CONVERT_METASTORE_ORC: ConfigEntry[Boolean]
/** Convert partitioned table inserts to use data source API */
val CONVERT_INSERTING_PARTITIONED_TABLE: ConfigEntry[Boolean]
/** Convert CREATE TABLE AS SELECT to use data source API */
val CONVERT_METASTORE_CTAS: ConfigEntry[Boolean]
/** Convert INSERT DIRECTORY to use data source API */
val CONVERT_METASTORE_INSERT_DIR: ConfigEntry[Boolean]
}Internal builder for creating Hive-enabled session state.
class HiveSessionStateBuilder(
session: SparkSession,
parentState: Option[SessionState]
) extends BaseSessionStateBuilder(session, parentState) {
/** Create external catalog with Hive metastore support */
override protected def externalCatalog: ExternalCatalog
/** Create session catalog with Hive integration */
override protected def catalog: SessionCatalog
/** Create analyzer with Hive-specific rules */
override protected def analyzer: Analyzer
}Session catalog with Hive metastore integration.
private[sql] class HiveSessionCatalog(
externalCatalogBuilder: () => ExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
val metastoreCatalog: HiveMetastoreCatalog,
functionRegistry: FunctionRegistry,
tableFunctionRegistry: TableFunctionRegistry,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader,
functionExpressionBuilder: FunctionExpressionBuilder
) extends SessionCatalogCommon configuration-related exceptions:
Example Error Handling:
import org.apache.spark.sql.AnalysisException
try {
val df = spark.sql("SELECT * FROM non_existent_table")
} catch {
case e: AnalysisException =>
println(s"Table not found: ${e.getMessage}")
}