or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinteractive-shell.mdsession-management.mdsignaling.md
tile.json

interactive-shell.mddocs/

Interactive Shell

The SparkILoop class provides the interactive shell interface for Apache Spark, extending Scala's standard REPL with Spark-specific functionality, automatic context initialization, and enhanced commands.

Core Shell Interface

SparkILoop Class

class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) extends ILoop(in0, out) {
  def this(in0: BufferedReader, out: JPrintWriter)
  def this()
  
  val initializationCommands: Seq[String]
  def initializeSpark(): Unit
  def printWelcome(): Unit
  def resetCommand(line: String): Unit
  def replay(): Unit
  def process(settings: Settings): Boolean
  def commands: List[LoopCommand]
}

Companion Object Utilities

object SparkILoop {
  def run(code: String, sets: Settings = new Settings): String
  def run(lines: List[String]): String
}

REPL Initialization

Automatic Spark Context Setup

// Initialization commands automatically executed:
val initializationCommands: Seq[String] = Seq(
  """
  @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
      org.apache.spark.repl.Main.sparkSession
    } else {
      org.apache.spark.repl.Main.createSparkSession()
    }
  @transient val sc = {
    val _sc = spark.sparkContext
    if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
      val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
      if (proxyUrl != null) {
        println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
      } else {
        println(s"Spark Context Web UI is available at Spark Master Public URL")
      }
    } else {
      _sc.uiWebUrl.foreach {
        webUrl => println(s"Spark context Web UI available at ${webUrl}")
      }
    }
    println("Spark context available as 'sc' " +
      s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
    println("Spark session available as 'spark'.")
    _sc
  }
  """,
  "import org.apache.spark.SparkContext._",
  "import spark.implicits._",
  "import spark.sql",
  "import org.apache.spark.sql.functions._"
)

Custom Initialization

import org.apache.spark.repl.SparkILoop
import scala.tools.nsc.GenericRunnerSettings

// Create REPL with custom settings
val repl = new SparkILoop()
val settings = new GenericRunnerSettings(msg => println(msg))

// Configure interpreter settings
settings.processArguments(List(
  "-Yrepl-class-based",
  "-classpath", "/path/to/jars"
), true)

// Start processing
repl.process(settings)

Programmatic Code Execution

Single Code Block Execution

import org.apache.spark.repl.SparkILoop

// Execute Scala code and capture output
val result = SparkILoop.run("""
  val numbers = sc.parallelize(1 to 100)
  val sum = numbers.sum()
  println(s"Sum of 1 to 100: $sum")
  sum
""")

println(s"REPL output: $result")

Multi-line Code Execution

// Execute multiple related code blocks
val codeLines = List(
  "case class Person(name: String, age: Int)",
  "val people = Seq(Person(\"Alice\", 25), Person(\"Bob\", 30))",
  "val df = spark.createDataFrame(people)",
  "df.show()",
  "df.filter($\"age\" > 27).count()"
)

val output = SparkILoop.run(codeLines)

Custom Settings for Execution

import scala.tools.nsc.Settings

// Create custom interpreter settings
val settings = new Settings()
settings.classpath.value = "/custom/classpath"
settings.usejavacp.value = true

// Execute with custom settings
val result = SparkILoop.run("sc.parallelize(1 to 10).collect()", settings)

REPL Commands and Features

Built-in Commands

// Available REPL commands (inherited from standard Scala REPL):
// :help - Show available commands
// :quit - Exit the REPL
// :load <file> - Load Scala file
// :paste - Enter paste mode for multi-line input
// :reset - Reset the REPL state (preserves Spark session)
// :replay - Replay command history

Spark-Specific Behavior

// Reset command preserves Spark session
repl.resetCommand(":reset")
// Output: "Note that after :reset, state of SparkSession and SparkContext is unchanged."

// Replay reinitializes Spark context
repl.replay()
// Automatically calls initializeSpark() before replaying history

Welcome Message and UI

Custom Welcome Display

// Spark-specific welcome message shown on startup:
"""
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.6
      /_/

Using Scala 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.19)
Type in expressions to have them evaluated.
Type :help for more information.

Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = app-20231201-000001).
Spark session available as 'spark'.
"""

Web UI Information Display

// Automatic Web UI URL display with proxy support
val webUrl = sc.uiWebUrl.getOrElse("Not available")
println(s"Spark context Web UI available at $webUrl")

// Reverse proxy support
if (sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
  val proxyUrl = sc.getConf.get("spark.ui.reverseProxyUrl", null)
  if (proxyUrl != null) {
    println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${sc.applicationId}")
  }
}

Advanced REPL Features

Input/Output Handling

import java.io.{BufferedReader, StringReader, OutputStreamWriter}
import scala.tools.nsc.interpreter.JPrintWriter

// Custom input/output handling
val input = new BufferedReader(new StringReader("val x = 42\nprintln(x)"))
val output = new JPrintWriter(new OutputStreamWriter(System.out), true)

val repl = new SparkILoop(input, output)
// REPL will read from input and write to output

Classpath and JAR Management

// JARs are automatically processed and added to classpath
// File URLs are normalized (file:// schemes removed)
val jars = Utils.getLocalUserJarsForShell(conf)
  .map { jar => 
    if (jar.startsWith("file:")) new File(new URI(jar)).getPath else jar 
  }
  .mkString(File.pathSeparator)

// Classpath is configured in interpreter arguments
val interpArguments = List(
  "-Yrepl-class-based",
  "-Yrepl-outdir", outputDir.getAbsolutePath,
  "-classpath", jars
)

Error Handling and Recovery

Initialization Error Handling

// Spark initialization error handling
def initializeSpark(): Unit = {
  if (!intp.reporter.hasErrors) {
    // Execute initialization commands
    initializationCommands.foreach(intp.quietRun)
  } else {
    throw new RuntimeException(
      "Scala interpreter encountered errors during initialization"
    )
  }
}

Runtime Error Recovery

// Graceful handling of execution errors
try {
  val result = SparkILoop.run("invalid.scala.code")
} catch {
  case e: Exception =>
    println(s"REPL execution failed: ${e.getMessage}")
    // REPL continues to be usable after errors
}

Signal Handling Integration

Interrupt Handling

The REPL integrates with Spark's signal handling for graceful job cancellation:

// Automatic setup on REPL start
Signaling.cancelOnInterrupt()

// Behavior on Ctrl+C:
// 1. First Ctrl+C: Cancel active Spark jobs
// 2. Second Ctrl+C: Terminate REPL process

Testing and Debugging Support

REPL State Inspection

// Access interpreter state for debugging
val interpreter = repl.intp
val hasErrors = interpreter.reporter.hasErrors

// Check if REPL is ready for input
val isReady = interpreter != null && !hasErrors

Test-Friendly Execution

// Capture REPL output for testing
import java.io.{ByteArrayOutputStream, PrintStream}

val outputStream = new ByteArrayOutputStream()
val printStream = new PrintStream(outputStream)

Console.withOut(printStream) {
  val result = SparkILoop.run("println(\"test output\")")
}

val capturedOutput = outputStream.toString

Performance Considerations

Lazy Evaluation

// REPL supports Spark's lazy evaluation model
val rdd = sc.parallelize(1 to 1000000)
val filtered = rdd.filter(_ % 2 == 0)  // No computation yet
val count = filtered.count()  // Computation triggered here

Memory Management

// REPL automatically manages temporary class files
// Output directory is automatically created and managed
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")

// Cleanup happens automatically on JVM shutdown