SparkSession is the unified entry point for all Spark SQL functionality. It provides access to DataFrames, SQL execution, configuration management, and catalog operations. SparkSession replaces the legacy SQLContext and HiveContext, offering a single interface for all Spark SQL operations.
object SparkSession {
def builder(): SparkSession.Builder
def getActiveSession: Option[SparkSession]
def getDefaultSession: Option[SparkSession]
def setActiveSession(session: SparkSession): Unit
def setDefaultSession(session: SparkSession): Unit
def clearActiveSession(): Unit
def clearDefaultSession(): Unit
}
class SparkSession.Builder {
def appName(name: String): Builder
def master(master: String): Builder
def config(key: String, value: String): Builder
def config(key: String, value: Long): Builder
def config(key: String, value: Double): Builder
def config(key: String, value: Boolean): Builder
def config(conf: SparkConf): Builder
def enableHiveSupport(): Builder
def getOrCreate(): SparkSession
}Usage Example:
import org.apache.spark.sql.SparkSession
// Create SparkSession with builder
val spark = SparkSession.builder()
.appName("My Spark Application")
.master("local[4]")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.enableHiveSupport()
.getOrCreate()
// Get existing session
val existingSession = SparkSession.getActiveSessionclass SparkSession {
def sql(sqlText: String): DataFrame
def read: DataFrameReader
def readStream: DataStreamReader
def catalog: Catalog
def conf: RuntimeConfig
def sparkContext: SparkContext
def version: String
def sessionState: SessionState
def sharedState: SharedState
def stop(): Unit
def close(): Unit
}Usage Examples:
// Execute SQL queries
val result = spark.sql("SELECT * FROM my_table WHERE age > 25")
result.show()
// Access configuration
spark.conf.set("spark.sql.shuffle.partitions", "200")
val partitions = spark.conf.get("spark.sql.shuffle.partitions")
// Get Spark version
println(s"Spark version: ${spark.version}")
// Stop session
spark.stop()class SparkSession {
def table(tableName: String): DataFrame
def range(end: Long): Dataset[Long]
def range(start: Long, end: Long): Dataset[Long]
def range(start: Long, end: Long, step: Long): Dataset[Long]
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
def createDataset[T : Encoder](data: RDD[T]): Dataset[T]
def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]
def emptyDataFrame: DataFrame
def emptyDataset[T : Encoder]: Dataset[T]
}Usage Examples:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Create DataFrame from sequence
case class Person(name: String, age: Int)
val people = Seq(Person("Alice", 25), Person("Bob", 30))
val peopleDF = spark.createDataFrame(people)
// Create DataFrame with explicit schema
val schema = StructType(Array(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val rows = Seq(Row("Charlie", 35), Row("Diana", 28))
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
// Create Dataset with type safety
val names = Seq("Alice", "Bob", "Charlie")
val namesDS = spark.createDataset(names)
// Create range Dataset
val numbers = spark.range(1, 1000000, 2, numPartitions = 100)
// Load existing table
val tableDF = spark.table("my_database.my_table")class RuntimeConfig {
def set(key: String, value: String): Unit
def set(key: String, value: Boolean): Unit
def set(key: String, value: Long): Unit
def get(key: String): String
def get(key: String, default: String): String
def getOption(key: String): Option[String]
def unset(key: String): Unit
def getAll: Map[String, String]
def isModifiable(key: String): Boolean
}Common Configuration Properties:
// Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
// Shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")
// Broadcast join threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
// Dynamic allocation
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "20")
// Serialization
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Check if property is modifiable
val canModify = spark.conf.isModifiable("spark.sql.shuffle.partitions")// Multiple sessions (advanced usage)
val session1 = SparkSession.builder()
.appName("Session1")
.master("local[2]")
.getOrCreate()
val session2 = SparkSession.builder()
.appName("Session2")
.master("local[2]")
.getOrCreate()
// Set active session
SparkSession.setActiveSession(session1)
// Clean shutdown
session1.stop()
session2.stop()
// Or use try-with-resources pattern
def withSparkSession[T](appName: String)(f: SparkSession => T): T = {
val spark = SparkSession.builder()
.appName(appName)
.master("local[*]")
.getOrCreate()
try {
f(spark)
} finally {
spark.stop()
}
}
// Usage
val result = withSparkSession("MyApp") { spark =>
spark.sql("SELECT 1 as test").collect()
}// Session-specific state (per SparkSession)
trait SessionState {
def catalog: SessionCatalog
def analyzer: Analyzer
def optimizer: Optimizer
def planner: SparkPlanner
def conf: SQLConf
}
// Shared state (across SparkSessions in same SparkContext)
trait SharedState {
def sparkContext: SparkContext
def externalCatalog: ExternalCatalog
def globalTempViewManager: GlobalTempViewManager
def cacheManager: CacheManager
}The session and shared state provide access to internal Spark SQL components, primarily used for advanced use cases and debugging.
// Enable Hive support during session creation
val spark = SparkSession.builder()
.appName("Hive Integration")
.enableHiveSupport()
.getOrCreate()
// Access Hive tables
val hiveTable = spark.table("hive_database.hive_table")
// Execute HiveQL
val result = spark.sql("SHOW TABLES IN hive_database")
// Use Hive SerDes and file formats
val df = spark.read
.format("hive")
.option("inputFormat", "org.apache.hadoop.mapred.TextInputFormat")
.option("outputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
.option("serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
.load("/path/to/hive/table")