Interactive Scala shell (REPL) component for Apache Spark providing real-time data processing capabilities and exploratory data analysis
Session management in Spark REPL handles the creation, configuration, and lifecycle of SparkSession and SparkContext instances for interactive use.
object Main extends Logging {
def main(args: Array[String]): Unit
def createSparkSession(): SparkSession
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit
}object Main {
val conf: SparkConf
val rootDir: String
val outputDir: File
var sparkContext: SparkContext
var sparkSession: SparkSession
var interp: SparkILoop
}import org.apache.spark.repl.Main
// Create a SparkSession configured for REPL use
val session = Main.createSparkSession()
// Access the associated SparkContext
val context = session.sparkContextimport org.apache.spark.repl.{Main, SparkILoop}
import scala.tools.nsc.GenericRunnerSettings
// Create custom REPL instance
val repl = new SparkILoop()
val args = Array[String]() // Command line arguments
// Start REPL with custom configuration
Main.doMain(args, repl)The REPL automatically detects and uses several environment variables:
// Environment variables automatically used:
// SPARK_HOME - Spark installation directory
// SPARK_EXECUTOR_URI - Custom executor URI
// Configuration properties automatically used:
// spark.repl.classdir - Custom class output directory (defaults to system temp)
// spark.repl.class.outputDir - Output directory for compiled classes// Default configuration applied automatically:
conf.setIfMissing("spark.app.name", "Spark shell")
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
// Environment variable handling
if (System.getenv("SPARK_EXECUTOR_URI") != null) {
conf.set("spark.executor.uri", System.getenv("SPARK_EXECUTOR_URI"))
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}
// Conditional Hive support
val builder = SparkSession.builder.config(conf)
if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
if (SparkSession.hiveClassesArePresent) {
sparkSession = builder.enableHiveSupport().getOrCreate()
} else {
builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
sparkSession = builder.getOrCreate()
}
} else {
sparkSession = builder.getOrCreate()
}// From within REPL or after initialization
val currentSession = Main.sparkSession
val currentContext = Main.sparkContext
// Check if session is available
if (Main.sparkSession != null) {
// Session is ready for use
val df = Main.sparkSession.read.json("data.json")
}// Access session configuration
val conf = Main.sparkSession.conf
val appName = conf.get("spark.app.name")
// Access SparkContext properties
val sc = Main.sparkContext
val masterId = sc.master
val appId = sc.applicationId// Automatic initialization when starting REPL
Main.main(Array.empty) // Starts full REPL with session
// Manual session creation (programmatic use)
val session = Main.createSparkSession()
// Session is now available via Main.sparkSession// Automatic cleanup on REPL exit
// SparkContext.stop() is called automatically
// Manual cleanup (for programmatic use)
Option(Main.sparkContext).foreach(_.stop())try {
val session = Main.createSparkSession()
} catch {
case e: ClassNotFoundException if e.getMessage.contains("org.apache.spark.sql.connect.SparkConnectPlugin") =>
// Handle missing Spark Connect plugin
logError("Failed to load spark connect plugin.")
logError("You need to build Spark with -Pconnect.")
sys.exit(1)
case e: Exception =>
// Handle other initialization failures
logError("Failed to initialize Spark session.", e)
sys.exit(1)
}// Check if session is properly initialized
def isSessionReady: Boolean = {
Main.sparkSession != null &&
Main.sparkContext != null &&
!Main.sparkContext.isStopped
}
// Validate session health
def validateSession(): Unit = {
require(Main.sparkSession != null, "SparkSession not initialized")
require(!Main.sparkContext.isStopped, "SparkContext has been stopped")
}The REPL automatically displays Spark Web UI information on startup:
// Automatic output on session creation:
// "Spark context Web UI available at http://localhost:4040"
// "Spark context available as 'sc' (master = local[*], app id = app-20231201-000001)."
// "Spark session available as 'spark'."
// Handle reverse proxy configurations
val proxyUrl = sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${sc.applicationId}")
}import org.apache.spark.SparkConf
// Modify configuration before session creation
Main.conf.set("spark.executor.memory", "2g")
Main.conf.set("spark.executor.cores", "2")
Main.conf.set("spark.sql.adaptive.enabled", "true")
// Then create session with custom config
val session = Main.createSparkSession()// User JARs are automatically detected and processed
val jars = Utils.getLocalUserJarsForShell(conf)
// Remove file:///, file:// or file:/ scheme if exists for each jar
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
.mkString(File.pathSeparator)
// JARs are included in interpreter classpath
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", jars
)// Package-private method for testing
class MyTest {
def testREPL(): Unit = {
val mockInterp = new SparkILoop()
val args = Array("--master", "local[1]")
// Use test-visible doMain method
Main.doMain(args, mockInterp)
// Verify session state
assert(Main.sparkSession != null)
assert(Main.sparkContext != null)
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-repl-2-12