0
# Interactive Shell
1
2
The SparkILoop class provides the interactive shell interface for Apache Spark, extending Scala's standard REPL with Spark-specific functionality, automatic context initialization, and enhanced commands.
3
4
## Core Shell Interface
5
6
### SparkILoop Class
7
8
```scala { .api }
9
class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) extends ILoop(in0, out) {
10
def this(in0: BufferedReader, out: JPrintWriter)
11
def this()
12
13
val initializationCommands: Seq[String]
14
def initializeSpark(): Unit
15
def printWelcome(): Unit
16
def resetCommand(line: String): Unit
17
def replay(): Unit
18
def process(settings: Settings): Boolean
19
def commands: List[LoopCommand]
20
}
21
```
22
23
### Companion Object Utilities
24
25
```scala { .api }
26
object SparkILoop {
27
def run(code: String, sets: Settings = new Settings): String
28
def run(lines: List[String]): String
29
}
30
```
31
32
## REPL Initialization
33
34
### Automatic Spark Context Setup
35
36
```scala
37
// Initialization commands automatically executed:
38
val initializationCommands: Seq[String] = Seq(
39
"""
40
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
41
org.apache.spark.repl.Main.sparkSession
42
} else {
43
org.apache.spark.repl.Main.createSparkSession()
44
}
45
@transient val sc = {
46
val _sc = spark.sparkContext
47
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
48
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
49
if (proxyUrl != null) {
50
println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
51
} else {
52
println(s"Spark Context Web UI is available at Spark Master Public URL")
53
}
54
} else {
55
_sc.uiWebUrl.foreach {
56
webUrl => println(s"Spark context Web UI available at ${webUrl}")
57
}
58
}
59
println("Spark context available as 'sc' " +
60
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
61
println("Spark session available as 'spark'.")
62
_sc
63
}
64
""",
65
"import org.apache.spark.SparkContext._",
66
"import spark.implicits._",
67
"import spark.sql",
68
"import org.apache.spark.sql.functions._"
69
)
70
```
71
72
### Custom Initialization
73
74
```scala
75
import org.apache.spark.repl.SparkILoop
76
import scala.tools.nsc.GenericRunnerSettings
77
78
// Create REPL with custom settings
79
val repl = new SparkILoop()
80
val settings = new GenericRunnerSettings(msg => println(msg))
81
82
// Configure interpreter settings
83
settings.processArguments(List(
84
"-Yrepl-class-based",
85
"-classpath", "/path/to/jars"
86
), true)
87
88
// Start processing
89
repl.process(settings)
90
```
91
92
## Programmatic Code Execution
93
94
### Single Code Block Execution
95
96
```scala
97
import org.apache.spark.repl.SparkILoop
98
99
// Execute Scala code and capture output
100
val result = SparkILoop.run("""
101
val numbers = sc.parallelize(1 to 100)
102
val sum = numbers.sum()
103
println(s"Sum of 1 to 100: $sum")
104
sum
105
""")
106
107
println(s"REPL output: $result")
108
```
109
110
### Multi-line Code Execution
111
112
```scala
113
// Execute multiple related code blocks
114
val codeLines = List(
115
"case class Person(name: String, age: Int)",
116
"val people = Seq(Person(\"Alice\", 25), Person(\"Bob\", 30))",
117
"val df = spark.createDataFrame(people)",
118
"df.show()",
119
"df.filter($\"age\" > 27).count()"
120
)
121
122
val output = SparkILoop.run(codeLines)
123
```
124
125
### Custom Settings for Execution
126
127
```scala
128
import scala.tools.nsc.Settings
129
130
// Create custom interpreter settings
131
val settings = new Settings()
132
settings.classpath.value = "/custom/classpath"
133
settings.usejavacp.value = true
134
135
// Execute with custom settings
136
val result = SparkILoop.run("sc.parallelize(1 to 10).collect()", settings)
137
```
138
139
## REPL Commands and Features
140
141
### Built-in Commands
142
143
```scala
144
// Available REPL commands (inherited from standard Scala REPL):
145
// :help - Show available commands
146
// :quit - Exit the REPL
147
// :load <file> - Load Scala file
148
// :paste - Enter paste mode for multi-line input
149
// :reset - Reset the REPL state (preserves Spark session)
150
// :replay - Replay command history
151
```
152
153
### Spark-Specific Behavior
154
155
```scala
156
// Reset command preserves Spark session
157
repl.resetCommand(":reset")
158
// Output: "Note that after :reset, state of SparkSession and SparkContext is unchanged."
159
160
// Replay reinitializes Spark context
161
repl.replay()
162
// Automatically calls initializeSpark() before replaying history
163
```
164
165
## Welcome Message and UI
166
167
### Custom Welcome Display
168
169
```scala
170
// Spark-specific welcome message shown on startup:
171
"""
172
Welcome to
173
____ __
174
/ __/__ ___ _____/ /__
175
_\ \/ _ \/ _ `/ __/ '_/
176
/___/ .__/\_,_/_/ /_/\_\ version 3.5.6
177
/_/
178
179
Using Scala 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.19)
180
Type in expressions to have them evaluated.
181
Type :help for more information.
182
183
Spark context Web UI available at http://localhost:4040
184
Spark context available as 'sc' (master = local[*], app id = app-20231201-000001).
185
Spark session available as 'spark'.
186
"""
187
```
188
189
### Web UI Information Display
190
191
```scala
192
// Automatic Web UI URL display with proxy support
193
val webUrl = sc.uiWebUrl.getOrElse("Not available")
194
println(s"Spark context Web UI available at $webUrl")
195
196
// Reverse proxy support
197
if (sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
198
val proxyUrl = sc.getConf.get("spark.ui.reverseProxyUrl", null)
199
if (proxyUrl != null) {
200
println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${sc.applicationId}")
201
}
202
}
203
```
204
205
## Advanced REPL Features
206
207
### Input/Output Handling
208
209
```scala
210
import java.io.{BufferedReader, StringReader, OutputStreamWriter}
211
import scala.tools.nsc.interpreter.JPrintWriter
212
213
// Custom input/output handling
214
val input = new BufferedReader(new StringReader("val x = 42\nprintln(x)"))
215
val output = new JPrintWriter(new OutputStreamWriter(System.out), true)
216
217
val repl = new SparkILoop(input, output)
218
// REPL will read from input and write to output
219
```
220
221
### Classpath and JAR Management
222
223
```scala
224
// JARs are automatically processed and added to classpath
225
// File URLs are normalized (file:// schemes removed)
226
val jars = Utils.getLocalUserJarsForShell(conf)
227
.map { jar =>
228
if (jar.startsWith("file:")) new File(new URI(jar)).getPath else jar
229
}
230
.mkString(File.pathSeparator)
231
232
// Classpath is configured in interpreter arguments
233
val interpArguments = List(
234
"-Yrepl-class-based",
235
"-Yrepl-outdir", outputDir.getAbsolutePath,
236
"-classpath", jars
237
)
238
```
239
240
## Error Handling and Recovery
241
242
### Initialization Error Handling
243
244
```scala
245
// Spark initialization error handling
246
def initializeSpark(): Unit = {
247
if (!intp.reporter.hasErrors) {
248
// Execute initialization commands
249
initializationCommands.foreach(intp.quietRun)
250
} else {
251
throw new RuntimeException(
252
"Scala interpreter encountered errors during initialization"
253
)
254
}
255
}
256
```
257
258
### Runtime Error Recovery
259
260
```scala
261
// Graceful handling of execution errors
262
try {
263
val result = SparkILoop.run("invalid.scala.code")
264
} catch {
265
case e: Exception =>
266
println(s"REPL execution failed: ${e.getMessage}")
267
// REPL continues to be usable after errors
268
}
269
```
270
271
## Signal Handling Integration
272
273
### Interrupt Handling
274
275
The REPL integrates with Spark's signal handling for graceful job cancellation:
276
277
```scala
278
// Automatic setup on REPL start
279
Signaling.cancelOnInterrupt()
280
281
// Behavior on Ctrl+C:
282
// 1. First Ctrl+C: Cancel active Spark jobs
283
// 2. Second Ctrl+C: Terminate REPL process
284
```
285
286
## Testing and Debugging Support
287
288
### REPL State Inspection
289
290
```scala
291
// Access interpreter state for debugging
292
val interpreter = repl.intp
293
val hasErrors = interpreter.reporter.hasErrors
294
295
// Check if REPL is ready for input
296
val isReady = interpreter != null && !hasErrors
297
```
298
299
### Test-Friendly Execution
300
301
```scala
302
// Capture REPL output for testing
303
import java.io.{ByteArrayOutputStream, PrintStream}
304
305
val outputStream = new ByteArrayOutputStream()
306
val printStream = new PrintStream(outputStream)
307
308
Console.withOut(printStream) {
309
val result = SparkILoop.run("println(\"test output\")")
310
}
311
312
val capturedOutput = outputStream.toString
313
```
314
315
## Performance Considerations
316
317
### Lazy Evaluation
318
319
```scala
320
// REPL supports Spark's lazy evaluation model
321
val rdd = sc.parallelize(1 to 1000000)
322
val filtered = rdd.filter(_ % 2 == 0) // No computation yet
323
val count = filtered.count() // Computation triggered here
324
```
325
326
### Memory Management
327
328
```scala
329
// REPL automatically manages temporary class files
330
// Output directory is automatically created and managed
331
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
332
333
// Cleanup happens automatically on JVM shutdown
334
```