Environment management handles the initialization, configuration, and lifecycle of Spark SQL environments for the Thrift Server, ensuring proper resource allocation and cleanup.
Singleton environment manager that provides centralized Spark context and SQL context management.
private[hive] object SparkSQLEnv extends Logging {
var sqlContext: SQLContext
var sparkContext: SparkContext
def init(): Unit
def stop(): Unit
}The init method creates and configures the Spark environment for Thrift Server operations:
Usage Example:
import org.apache.spark.sql.hive.thriftserver.SparkSQLEnv
// Initialize environment (typically called by server startup)
SparkSQLEnv.init()
// Environment is now available
val sqlContext = SparkSQLEnv.sqlContext
val sparkContext = SparkSQLEnv.sparkContextInitialization Process:
The environment automatically handles configuration from multiple sources:
val sparkConf = new SparkConf(loadDefaults = true)
// Application name resolution
val maybeAppName = sparkConf
.getOption("spark.app.name")
.filterNot(_ == classOf[SparkSQLCLIDriver].getName)
.filterNot(_ == classOf[HiveThriftServer2].getName)
sparkConf.setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}"))Configuration Sources:
The environment ensures proper Hive integration for SQL compatibility:
val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate()
// Force session state initialization with correct class loader
sparkSession.sessionState
// Configure Hive metastore client
val metadataHive = sparkSession
.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
// Set Hive version compatibility
sparkSession.conf.set(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion)Hive Integration Features:
The stop method provides comprehensive cleanup of all resources:
def stop(): Unit = {
logDebug("Shutting down Spark SQL Environment")
// Stop the SparkContext
if (SparkSQLEnv.sparkContext != null) {
sparkContext.stop()
sparkContext = null
sqlContext = null
}
}Cleanup Process:
The environment uses intelligent application naming based on the startup method:
val maybeAppName = sparkConf
.getOption("spark.app.name")
.filterNot(_ == classOf[SparkSQLCLIDriver].getName)
.filterNot(_ == classOf[HiveThriftServer2].getName)
sparkConf.setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}"))Naming Strategy:
Application Name Examples:
"MyThriftServerApp""SparkSQL::worker-node-01""SparkSQL::dev-machine"The environment manages the complete lifecycle of Spark contexts:
Initialization Phase:
Runtime Phase:
Shutdown Phase:
Critical session state initialization ensures proper operation:
// SPARK-29604: force initialization of the session state with the Spark class loader,
// instead of having it happen during the initialization of the Hive client (which may use a
// different class loader).
sparkSession.sessionStateThis prevents class loading issues that can occur when Hive clients use different class loaders.
The environment integrates configuration from various sources:
Configuration Hierarchy:
Specific configuration ensures Hive compatibility:
sparkSession.conf.set(HiveUtils.FAKE_HIVE_VERSION.key, HiveUtils.builtinHiveVersion)Compatibility Features:
The environment handles various cluster deployment modes:
Local Mode:
val conf = new SparkConf().setMaster("local[*]")Standalone Cluster:
val conf = new SparkConf().setMaster("spark://master:7077")YARN Integration:
val conf = new SparkConf().setMaster("yarn").setDeployMode("cluster")Kubernetes Support:
val conf = new SparkConf().setMaster("k8s://api-server:8443")Environment initialization includes security configuration:
Authentication:
Authorization:
Encryption:
The environment provides hooks for monitoring systems:
Metrics Collection:
Event Generation:
Health Checks: