CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-repl-2-12

Interactive Scala shell (REPL) component for Apache Spark providing real-time data processing capabilities and exploratory data analysis

Overview
Eval results
Files

session-management.mddocs/

Session Management

Session management in Spark REPL handles the creation, configuration, and lifecycle of SparkSession and SparkContext instances for interactive use.

Core Session Management

Main Entry Point

object Main extends Logging {
  def main(args: Array[String]): Unit
  def createSparkSession(): SparkSession
  private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit
}

Configuration Properties

object Main {
  val conf: SparkConf
  val rootDir: String
  val outputDir: File
  var sparkContext: SparkContext
  var sparkSession: SparkSession
  var interp: SparkILoop
}

Session Creation

Basic Session Creation

import org.apache.spark.repl.Main

// Create a SparkSession configured for REPL use
val session = Main.createSparkSession()

// Access the associated SparkContext
val context = session.sparkContext

Programmatic REPL Startup

import org.apache.spark.repl.{Main, SparkILoop}
import scala.tools.nsc.GenericRunnerSettings

// Create custom REPL instance
val repl = new SparkILoop()
val args = Array[String]() // Command line arguments

// Start REPL with custom configuration
Main.doMain(args, repl)

Configuration

Environment Variables

The REPL automatically detects and uses several environment variables:

// Environment variables automatically used:
// SPARK_HOME - Spark installation directory
// SPARK_EXECUTOR_URI - Custom executor URI

// Configuration properties automatically used:
// spark.repl.classdir - Custom class output directory (defaults to system temp)
// spark.repl.class.outputDir - Output directory for compiled classes

Automatic Configuration

// Default configuration applied automatically:
conf.setIfMissing("spark.app.name", "Spark shell")
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())

// Environment variable handling
if (System.getenv("SPARK_EXECUTOR_URI") != null) {
  conf.set("spark.executor.uri", System.getenv("SPARK_EXECUTOR_URI"))
}
if (System.getenv("SPARK_HOME") != null) {
  conf.setSparkHome(System.getenv("SPARK_HOME"))
}

// Conditional Hive support
val builder = SparkSession.builder.config(conf)
if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
  if (SparkSession.hiveClassesArePresent) {
    sparkSession = builder.enableHiveSupport().getOrCreate()
  } else {
    builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
    sparkSession = builder.getOrCreate()
  }
} else {
  sparkSession = builder.getOrCreate()
}

Session Access

Accessing Active Session

// From within REPL or after initialization
val currentSession = Main.sparkSession
val currentContext = Main.sparkContext

// Check if session is available
if (Main.sparkSession != null) {
  // Session is ready for use
  val df = Main.sparkSession.read.json("data.json")
}

Session Properties

// Access session configuration
val conf = Main.sparkSession.conf
val appName = conf.get("spark.app.name")

// Access SparkContext properties
val sc = Main.sparkContext
val masterId = sc.master
val appId = sc.applicationId

Lifecycle Management

Session Initialization

// Automatic initialization when starting REPL
Main.main(Array.empty) // Starts full REPL with session

// Manual session creation (programmatic use)
val session = Main.createSparkSession()
// Session is now available via Main.sparkSession

Session Cleanup

// Automatic cleanup on REPL exit
// SparkContext.stop() is called automatically

// Manual cleanup (for programmatic use)
Option(Main.sparkContext).foreach(_.stop())

Error Handling

Initialization Errors

try {
  val session = Main.createSparkSession()
} catch {
  case e: ClassNotFoundException if e.getMessage.contains("org.apache.spark.sql.connect.SparkConnectPlugin") =>
    // Handle missing Spark Connect plugin
    logError("Failed to load spark connect plugin.")
    logError("You need to build Spark with -Pconnect.")
    sys.exit(1)
    
  case e: Exception =>
    // Handle other initialization failures
    logError("Failed to initialize Spark session.", e)
    sys.exit(1)
}

Session State Validation

// Check if session is properly initialized
def isSessionReady: Boolean = {
  Main.sparkSession != null && 
  Main.sparkContext != null && 
  !Main.sparkContext.isStopped
}

// Validate session health
def validateSession(): Unit = {
  require(Main.sparkSession != null, "SparkSession not initialized")
  require(!Main.sparkContext.isStopped, "SparkContext has been stopped")
}

Web UI Integration

Automatic UI Display

The REPL automatically displays Spark Web UI information on startup:

// Automatic output on session creation:
// "Spark context Web UI available at http://localhost:4040"
// "Spark context available as 'sc' (master = local[*], app id = app-20231201-000001)."
// "Spark session available as 'spark'."

// Handle reverse proxy configurations
val proxyUrl = sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
  println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${sc.applicationId}")
}

Advanced Configuration

Custom Spark Configuration

import org.apache.spark.SparkConf

// Modify configuration before session creation
Main.conf.set("spark.executor.memory", "2g")
Main.conf.set("spark.executor.cores", "2")
Main.conf.set("spark.sql.adaptive.enabled", "true")

// Then create session with custom config
val session = Main.createSparkSession()

JAR Management

// User JARs are automatically detected and processed
val jars = Utils.getLocalUserJarsForShell(conf)
  // Remove file:///, file:// or file:/ scheme if exists for each jar
  .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
  .mkString(File.pathSeparator)

// JARs are included in interpreter classpath
val interpArguments = List(
  "-Yrepl-class-based",
  "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
  "-classpath", jars
)

Testing Support

Test-Friendly Access

// Package-private method for testing
class MyTest {
  def testREPL(): Unit = {
    val mockInterp = new SparkILoop()
    val args = Array("--master", "local[1]")
    
    // Use test-visible doMain method
    Main.doMain(args, mockInterp)
    
    // Verify session state
    assert(Main.sparkSession != null)
    assert(Main.sparkContext != null)
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-repl-2-12

docs

index.md

interactive-shell.md

session-management.md

signaling.md

tile.json