The SparkILoop class provides the interactive shell interface for Apache Spark, extending Scala's standard REPL with Spark-specific functionality, automatic context initialization, and enhanced commands.
class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) extends ILoop(in0, out) {
def this(in0: BufferedReader, out: JPrintWriter)
def this()
val initializationCommands: Seq[String]
def initializeSpark(): Unit
def printWelcome(): Unit
def resetCommand(line: String): Unit
def replay(): Unit
def process(settings: Settings): Boolean
def commands: List[LoopCommand]
}object SparkILoop {
def run(code: String, sets: Settings = new Settings): String
def run(lines: List[String]): String
}// Initialization commands automatically executed:
val initializationCommands: Seq[String] = Seq(
"""
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
org.apache.spark.repl.Main.sparkSession
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
} else {
println(s"Spark Context Web UI is available at Spark Master Public URL")
}
} else {
_sc.uiWebUrl.foreach {
webUrl => println(s"Spark context Web UI available at ${webUrl}")
}
}
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
_sc
}
""",
"import org.apache.spark.SparkContext._",
"import spark.implicits._",
"import spark.sql",
"import org.apache.spark.sql.functions._"
)import org.apache.spark.repl.SparkILoop
import scala.tools.nsc.GenericRunnerSettings
// Create REPL with custom settings
val repl = new SparkILoop()
val settings = new GenericRunnerSettings(msg => println(msg))
// Configure interpreter settings
settings.processArguments(List(
"-Yrepl-class-based",
"-classpath", "/path/to/jars"
), true)
// Start processing
repl.process(settings)import org.apache.spark.repl.SparkILoop
// Execute Scala code and capture output
val result = SparkILoop.run("""
val numbers = sc.parallelize(1 to 100)
val sum = numbers.sum()
println(s"Sum of 1 to 100: $sum")
sum
""")
println(s"REPL output: $result")// Execute multiple related code blocks
val codeLines = List(
"case class Person(name: String, age: Int)",
"val people = Seq(Person(\"Alice\", 25), Person(\"Bob\", 30))",
"val df = spark.createDataFrame(people)",
"df.show()",
"df.filter($\"age\" > 27).count()"
)
val output = SparkILoop.run(codeLines)import scala.tools.nsc.Settings
// Create custom interpreter settings
val settings = new Settings()
settings.classpath.value = "/custom/classpath"
settings.usejavacp.value = true
// Execute with custom settings
val result = SparkILoop.run("sc.parallelize(1 to 10).collect()", settings)// Available REPL commands (inherited from standard Scala REPL):
// :help - Show available commands
// :quit - Exit the REPL
// :load <file> - Load Scala file
// :paste - Enter paste mode for multi-line input
// :reset - Reset the REPL state (preserves Spark session)
// :replay - Replay command history// Reset command preserves Spark session
repl.resetCommand(":reset")
// Output: "Note that after :reset, state of SparkSession and SparkContext is unchanged."
// Replay reinitializes Spark context
repl.replay()
// Automatically calls initializeSpark() before replaying history// Spark-specific welcome message shown on startup:
"""
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.6
/_/
Using Scala 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.19)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = app-20231201-000001).
Spark session available as 'spark'.
"""// Automatic Web UI URL display with proxy support
val webUrl = sc.uiWebUrl.getOrElse("Not available")
println(s"Spark context Web UI available at $webUrl")
// Reverse proxy support
if (sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${sc.applicationId}")
}
}import java.io.{BufferedReader, StringReader, OutputStreamWriter}
import scala.tools.nsc.interpreter.JPrintWriter
// Custom input/output handling
val input = new BufferedReader(new StringReader("val x = 42\nprintln(x)"))
val output = new JPrintWriter(new OutputStreamWriter(System.out), true)
val repl = new SparkILoop(input, output)
// REPL will read from input and write to output// JARs are automatically processed and added to classpath
// File URLs are normalized (file:// schemes removed)
val jars = Utils.getLocalUserJarsForShell(conf)
.map { jar =>
if (jar.startsWith("file:")) new File(new URI(jar)).getPath else jar
}
.mkString(File.pathSeparator)
// Classpath is configured in interpreter arguments
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", outputDir.getAbsolutePath,
"-classpath", jars
)// Spark initialization error handling
def initializeSpark(): Unit = {
if (!intp.reporter.hasErrors) {
// Execute initialization commands
initializationCommands.foreach(intp.quietRun)
} else {
throw new RuntimeException(
"Scala interpreter encountered errors during initialization"
)
}
}// Graceful handling of execution errors
try {
val result = SparkILoop.run("invalid.scala.code")
} catch {
case e: Exception =>
println(s"REPL execution failed: ${e.getMessage}")
// REPL continues to be usable after errors
}The REPL integrates with Spark's signal handling for graceful job cancellation:
// Automatic setup on REPL start
Signaling.cancelOnInterrupt()
// Behavior on Ctrl+C:
// 1. First Ctrl+C: Cancel active Spark jobs
// 2. Second Ctrl+C: Terminate REPL process// Access interpreter state for debugging
val interpreter = repl.intp
val hasErrors = interpreter.reporter.hasErrors
// Check if REPL is ready for input
val isReady = interpreter != null && !hasErrors// Capture REPL output for testing
import java.io.{ByteArrayOutputStream, PrintStream}
val outputStream = new ByteArrayOutputStream()
val printStream = new PrintStream(outputStream)
Console.withOut(printStream) {
val result = SparkILoop.run("println(\"test output\")")
}
val capturedOutput = outputStream.toString// REPL supports Spark's lazy evaluation model
val rdd = sc.parallelize(1 to 1000000)
val filtered = rdd.filter(_ % 2 == 0) // No computation yet
val count = filtered.count() // Computation triggered here// REPL automatically manages temporary class files
// Output directory is automatically created and managed
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
// Cleanup happens automatically on JVM shutdown