or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mddata-sources.mddata-types.mddataframe-dataset.mdindex.mdsession-management.mdsql-functions.mdstreaming.mdudfs.md
tile.json

session-management.mddocs/

Apache Spark SQL - Session Management

Capabilities

SparkSession Creation and Configuration

  • Create SparkSession instances using the builder pattern with flexible configuration options
  • Configure Spark applications with runtime settings, SQL configurations, and custom properties
  • Access application metadata, configuration state, and runtime information
  • Manage application lifecycle including stopping sessions gracefully

Runtime Configuration Management

  • Set and retrieve runtime configuration properties during application execution
  • Check configuration property mutability and reset configurations to defaults
  • Handle both Spark SQL specific and general Spark configuration parameters
  • Validate configuration values and handle configuration-related errors

Application Context Access

  • Access Spark application details including application ID, name, and start time
  • Retrieve SparkContext for lower-level Spark functionality when needed
  • Monitor application state and resource allocation information

API Reference

SparkSession Class

abstract class SparkSession extends Serializable with Closeable {
  // Session properties
  def sparkContext: SparkContext
  def conf: RuntimeConfig
  def version: String
  
  // Data access interfaces  
  def read: DataFrameReader
  def readStream: DataStreamReader
  def streams: StreamingQueryManager
  def catalog: Catalog
  def udf: UDFRegistration
  
  // SQL execution
  def sql(sqlText: String): DataFrame
  def table(tableName: String): DataFrame
  
  // DataFrame/Dataset creation
  def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
  def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
  def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
  def createDataset[T : Encoder](data: Seq[T]): Dataset[T]
  def createDataset[T : Encoder](data: RDD[T]): Dataset[T]
  def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]
  
  // Session lifecycle
  def stop(): Unit
  def close(): Unit
  def newSession(): SparkSession
}

SparkSession.Builder Class

class Builder {
  // Basic configuration
  def appName(name: String): Builder
  def master(master: String): Builder
  def config(key: String, value: String): Builder
  def config(key: String, value: Long): Builder  
  def config(key: String, value: Double): Builder
  def config(key: String, value: Boolean): Builder
  def config(conf: SparkConf): Builder
  
  // Advanced configuration
  def enableHiveSupport(): Builder
  def withExtensions(f: SparkSessionExtensions => Unit): Builder
  
  // Session creation
  def getOrCreate(): SparkSession
}

RuntimeConfig Class

abstract class RuntimeConfig {
  // Configuration getters
  def get(key: String): String
  def get(key: String, default: String): String
  def getOption(key: String): Option[String]
  def getAll: Map[String, String]
  
  // Configuration setters
  def set(key: String, value: String): Unit
  def set(key: String, value: Boolean): Unit
  def set(key: String, value: Long): Unit
  
  // Configuration management
  def unset(key: String): Unit
  def isModifiable(key: String): Boolean
}

Supporting Types

SparkConf

class SparkConf(loadDefaults: Boolean = true) extends Cloneable with Serializable {
  def set(key: String, value: String): SparkConf
  def setAppName(name: String): SparkConf
  def setMaster(master: String): SparkConf
  def get(key: String): String
  def get(key: String, defaultValue: String): String
  def getOption(key: String): Option[String]
  def getAll: Array[(String, String)]
  def remove(key: String): SparkConf
}

SparkContext

class SparkContext(config: SparkConf) extends Logging {
  def appName: String
  def applicationId: String
  def master: String
  def deployMode: String
  def version: String
  def startTime: Long
  def defaultParallelism: Int
  def getConf: SparkConf
  def isLocal: Boolean
  def isStopped: Boolean
  def stop(): Unit
}

Usage Examples

Creating a Basic SparkSession

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("My Spark Application")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "/path/to/warehouse")
  .getOrCreate()

Creating SparkSession with Custom Configuration

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf()
  .setAppName("Advanced Spark App")
  .setMaster("yarn")
  .set("spark.sql.adaptive.enabled", "true")
  .set("spark.sql.adaptive.coalescePartitions.enabled", "true")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val spark = SparkSession.builder()
  .config(conf)
  .enableHiveSupport()
  .getOrCreate()

Runtime Configuration Management

// Get current configuration
val currentWarehouse = spark.conf.get("spark.sql.warehouse.dir")

// Set configuration at runtime
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", true)

// Check if configuration can be modified
val canModify = spark.conf.isModifiable("spark.sql.shuffle.partitions")

// Get all configurations
val allConfigs = spark.conf.getAll
allConfigs.foreach { case (key, value) => 
  println(s"$key = $value")
}

// Unset configuration (reset to default)
spark.conf.unset("spark.sql.shuffle.partitions")

Session Extensions and Custom Configuration

import org.apache.spark.sql.SparkSessionExtensions

val spark = SparkSession.builder()
  .appName("Extended Spark Session")
  .withExtensions { extensions =>
    // Register custom rules, functions, etc.
    extensions.injectResolutionRule { session =>
      // Custom resolution rule implementation
      new CustomResolutionRule(session)
    }
  }
  .config("spark.sql.extensions", "com.example.MySparkExtension")
  .getOrCreate()

Accessing Application Information

// Get application details
println(s"Application ID: ${spark.sparkContext.applicationId}")
println(s"Application Name: ${spark.sparkContext.appName}")
println(s"Spark Version: ${spark.version}")
println(s"Master URL: ${spark.sparkContext.master}")
println(s"Deploy Mode: ${spark.sparkContext.deployMode}")
println(s"Default Parallelism: ${spark.sparkContext.defaultParallelism}")

// Check application state
if (!spark.sparkContext.isStopped) {
  println("Application is running")
}

Managing Multiple Sessions

// Create primary session
val primarySpark = SparkSession.builder()
  .appName("Primary Session")
  .getOrCreate()

// Create isolated session with different configuration
val isolatedSpark = primarySpark.newSession()
isolatedSpark.conf.set("spark.sql.shuffle.partitions", "100")

// Both sessions share the same SparkContext but have independent configurations
println(s"Primary partitions: ${primarySpark.conf.get('spark.sql.shuffle.partitions')}")
println(s"Isolated partitions: ${isolatedSpark.conf.get('spark.sql.shuffle.partitions')}")

// Clean shutdown
isolatedSpark.close()
primarySpark.stop()

Configuration Best Practices

import org.apache.spark.sql.SparkSession

// Production-ready configuration
val spark = SparkSession.builder()
  .appName("Production Data Pipeline")
  // Resource allocation
  .config("spark.executor.memory", "4g")
  .config("spark.executor.cores", "2")
  .config("spark.executor.instances", "10")
  
  // SQL optimizations
  .config("spark.sql.adaptive.enabled", "true")
  .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
  .config("spark.sql.adaptive.skewJoin.enabled", "true")
  
  // Serialization
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .config("spark.sql.execution.arrow.pyspark.enabled", "true")
  
  // Caching and checkpointing
  .config("spark.sql.cache.serializer", "org.apache.spark.serializer.KryoSerializer")
  .config("spark.checkpoint.compress", "true")
  
  // Monitoring
  .config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000")
  .config("spark.sql.sources.parallelPartitionDiscovery.threshold", "32")
  
  .getOrCreate()

// Set runtime configurations for specific workloads
if (isLargeDataset) {
  spark.conf.set("spark.sql.shuffle.partitions", "400")
} else {
  spark.conf.set("spark.sql.shuffle.partitions", "200")
}