or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md
tile.json

session-management.mddocs/

Session Management

SparkSession is the unified entry point for all Spark SQL functionality. It provides access to DataFrames, SQL execution, configuration management, and catalog operations. SparkSession replaces the legacy SQLContext and HiveContext, offering a single interface for all Spark SQL operations.

SparkSession Creation

object SparkSession {
  def builder(): SparkSession.Builder
  def getActiveSession: Option[SparkSession]
  def getDefaultSession: Option[SparkSession]
  def setActiveSession(session: SparkSession): Unit
  def setDefaultSession(session: SparkSession): Unit
  def clearActiveSession(): Unit
  def clearDefaultSession(): Unit
}

class SparkSession.Builder {
  def appName(name: String): Builder
  def master(master: String): Builder
  def config(key: String, value: String): Builder
  def config(key: String, value: Long): Builder
  def config(key: String, value: Double): Builder
  def config(key: String, value: Boolean): Builder
  def config(conf: SparkConf): Builder
  def enableHiveSupport(): Builder
  def getOrCreate(): SparkSession
}

Usage Example:

import org.apache.spark.sql.SparkSession

// Create SparkSession with builder
val spark = SparkSession.builder()
  .appName("My Spark Application")
  .master("local[4]")
  .config("spark.sql.adaptive.enabled", "true")
  .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
  .enableHiveSupport()
  .getOrCreate()

// Get existing session
val existingSession = SparkSession.getActiveSession

Core Session Operations

class SparkSession {
  def sql(sqlText: String): DataFrame
  def read: DataFrameReader
  def readStream: DataStreamReader
  def catalog: Catalog
  def conf: RuntimeConfig
  def sparkContext: SparkContext
  def version: String
  def sessionState: SessionState
  def sharedState: SharedState
  def stop(): Unit
  def close(): Unit
}

Usage Examples:

// Execute SQL queries
val result = spark.sql("SELECT * FROM my_table WHERE age > 25")
result.show()

// Access configuration
spark.conf.set("spark.sql.shuffle.partitions", "200")
val partitions = spark.conf.get("spark.sql.shuffle.partitions")

// Get Spark version
println(s"Spark version: ${spark.version}")

// Stop session
spark.stop()

Data Creation Methods

class SparkSession {
  def table(tableName: String): DataFrame
  def range(end: Long): Dataset[Long]
  def range(start: Long, end: Long): Dataset[Long]
  def range(start: Long, end: Long, step: Long): Dataset[Long]
  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]
  
  def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
  def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
  def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
  def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
  
  def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
  def createDataset[T : Encoder](data: RDD[T]): Dataset[T]
  def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]
  
  def emptyDataFrame: DataFrame
  def emptyDataset[T : Encoder]: Dataset[T]
}

Usage Examples:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Create DataFrame from sequence
case class Person(name: String, age: Int)
val people = Seq(Person("Alice", 25), Person("Bob", 30))
val peopleDF = spark.createDataFrame(people)

// Create DataFrame with explicit schema
val schema = StructType(Array(
  StructField("name", StringType, nullable = false),
  StructField("age", IntegerType, nullable = false)
))
val rows = Seq(Row("Charlie", 35), Row("Diana", 28))
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)

// Create Dataset with type safety
val names = Seq("Alice", "Bob", "Charlie")
val namesDS = spark.createDataset(names)

// Create range Dataset
val numbers = spark.range(1, 1000000, 2, numPartitions = 100)

// Load existing table
val tableDF = spark.table("my_database.my_table")

Session Configuration

class RuntimeConfig {
  def set(key: String, value: String): Unit
  def set(key: String, value: Boolean): Unit
  def set(key: String, value: Long): Unit
  def get(key: String): String
  def get(key: String, default: String): String
  def getOption(key: String): Option[String]
  def unset(key: String): Unit
  def getAll: Map[String, String]
  def isModifiable(key: String): Boolean
}

Common Configuration Properties:

// Adaptive Query Execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

// Shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")

// Broadcast join threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

// Dynamic allocation
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "20")

// Serialization
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// Check if property is modifiable
val canModify = spark.conf.isModifiable("spark.sql.shuffle.partitions")

Session Lifecycle Management

// Multiple sessions (advanced usage)
val session1 = SparkSession.builder()
  .appName("Session1")
  .master("local[2]")
  .getOrCreate()

val session2 = SparkSession.builder()
  .appName("Session2")
  .master("local[2]")
  .getOrCreate()

// Set active session
SparkSession.setActiveSession(session1)

// Clean shutdown
session1.stop()
session2.stop()

// Or use try-with-resources pattern
def withSparkSession[T](appName: String)(f: SparkSession => T): T = {
  val spark = SparkSession.builder()
    .appName(appName)
    .master("local[*]")
    .getOrCreate()
  try {
    f(spark)
  } finally {
    spark.stop()
  }
}

// Usage
val result = withSparkSession("MyApp") { spark =>
  spark.sql("SELECT 1 as test").collect()
}

Session State and Shared State

// Session-specific state (per SparkSession)
trait SessionState {
  def catalog: SessionCatalog
  def analyzer: Analyzer
  def optimizer: Optimizer
  def planner: SparkPlanner
  def conf: SQLConf
}

// Shared state (across SparkSessions in same SparkContext)
trait SharedState {
  def sparkContext: SparkContext
  def externalCatalog: ExternalCatalog
  def globalTempViewManager: GlobalTempViewManager
  def cacheManager: CacheManager
}

The session and shared state provide access to internal Spark SQL components, primarily used for advanced use cases and debugging.

Integration with Hive

// Enable Hive support during session creation
val spark = SparkSession.builder()
  .appName("Hive Integration")
  .enableHiveSupport()
  .getOrCreate()

// Access Hive tables
val hiveTable = spark.table("hive_database.hive_table")

// Execute HiveQL
val result = spark.sql("SHOW TABLES IN hive_database")

// Use Hive SerDes and file formats
val df = spark.read
  .format("hive")
  .option("inputFormat", "org.apache.hadoop.mapred.TextInputFormat")
  .option("outputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
  .option("serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
  .load("/path/to/hive/table")