Core session management and configuration for Spark SQL applications. SparkSession serves as the main entry point providing access to all functionality, replacing the older SQLContext from Spark 1.x.
Primary entry point for all Spark SQL functionality. SparkSession provides a unified interface for working with structured data.
/**
* Main entry point for Spark SQL functionality
*/
class SparkSession {
/** Runtime configuration interface */
def conf: RuntimeConfig
/** Catalog interface for metadata operations */
def catalog: Catalog
/** UDF registration interface */
def udf: UDFRegistration
/** Streaming query manager */
def streams: StreamingQueryManager
/** Execute SQL query and return DataFrame */
def sql(sqlText: String): DataFrame
/** Interface for reading data in batch mode */
def read: DataFrameReader
/** Interface for reading streaming data */
def readStream: DataStreamReader
/** Get table as DataFrame */
def table(tableName: String): DataFrame
/** Create empty DataFrame */
def emptyDataFrame: DataFrame
/** Create empty Dataset */
def emptyDataset[T: Encoder]: Dataset[T]
/** Create DataFrame from RDD of Rows */
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
/** Create DataFrame from local Seq of Rows */
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
/** Create Dataset from local sequence */
def createDataset[T: Encoder](data: Seq[T]): Dataset[T]
/** Create DataFrame with range of numbers */
def range(end: Long): Dataset[Long]
def range(start: Long, end: Long): Dataset[Long]
def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
/** Stop the SparkSession */
def stop(): Unit
/** Spark version */
def version: String
}Builder pattern for creating SparkSession instances with custom configuration.
/**
* Builder for SparkSession creation
*/
class Builder {
/** Set application name */
def appName(name: String): Builder
/** Set master URL */
def master(master: String): Builder
/** Set configuration option */
def config(key: String, value: String): Builder
def config(key: String, value: Long): Builder
def config(key: String, value: Double): Builder
def config(key: String, value: Boolean): Builder
/** Enable Hive support */
def enableHiveSupport(): Builder
/** Get or create SparkSession */
def getOrCreate(): SparkSession
}
object SparkSession {
/** Create new Builder */
def builder(): Builder
/** Get currently active SparkSession */
def active: SparkSession
/** Set currently active SparkSession */
def setActiveSession(session: SparkSession): Unit
/** Clear active SparkSession */
def clearActiveSession(): Unit
}Usage Example:
import org.apache.spark.sql.SparkSession
// Create SparkSession with configuration
val spark = SparkSession.builder()
.appName("My Spark Application")
.master("local[4]")
.config("spark.sql.warehouse.dir", "/path/to/warehouse")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.enableHiveSupport()
.getOrCreate()
// Use the session
val df = spark.sql("SELECT * FROM my_table")
df.show()
// Clean up
spark.stop()Interface for managing Spark configuration at runtime.
/**
* Runtime configuration interface for Spark
*/
class RuntimeConfig {
/** Set configuration value */
def set(key: String, value: String): Unit
def set(key: String, value: Boolean): Unit
def set(key: String, value: Long): Unit
/** Get configuration value */
def get(key: String): String
def get(key: String, defaultValue: String): String
/** Get all configuration values */
def getAll: Map[String, String]
/** Remove configuration */
def unset(key: String): Unit
/** Check if configuration is modifiable */
def isModifiable(key: String): Boolean
}Usage Example:
// Configure at runtime
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", true)
// Read configuration
val shufflePartitions = spark.conf.get("spark.sql.shuffle.partitions")
val isAdaptiveEnabled = spark.conf.get("spark.sql.adaptive.enabled", "false").toBoolean
// View all configuration
val allConfigs = spark.conf.getAll
allConfigs.foreach { case (key, value) => println(s"$key: $value") }Legacy entry point maintained for backward compatibility with Spark 1.x applications.
/**
* Legacy entry point for Spark SQL (deprecated in favor of SparkSession)
*/
class SQLContext(sparkContext: SparkContext) {
/** Get or create SparkSession */
def sparkSession: SparkSession
/** Execute SQL query */
def sql(sqlText: String): DataFrame
/** Read interface */
def read: DataFrameReader
/** UDF registration */
def udf: UDFRegistration
/** Create DataFrame */
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
}
object SQLContext {
/** Get active SQLContext */
def getOrCreate(sparkContext: SparkContext): SQLContext
}val spark = SparkSession.builder()
.appName("Local Development")
.master("local[*]") // Use all available cores
.config("spark.sql.warehouse.dir", "spark-warehouse")
.getOrCreate()
import spark.implicits._ // For DataFrame operationsval spark = SparkSession.builder()
.appName("Production Application")
// Master set by spark-submit
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.adaptive.skewJoin.enabled", "true")
.getOrCreate()// Check if running in cluster mode
val isCluster = spark.conf.get("spark.master").startsWith("yarn") ||
spark.conf.get("spark.master").startsWith("mesos")
// Adjust configuration based on environment
if (isCluster) {
spark.conf.set("spark.sql.shuffle.partitions", "400")
} else {
spark.conf.set("spark.sql.shuffle.partitions", "4")
}