0
# Interactive Shell
1
2
Core interactive shell functionality with Spark-specific initialization, commands, and REPL processing.
3
4
## Capabilities
5
6
### SparkILoop Class
7
8
Spark-specific interactive shell loop that extends Scala's standard ILoop with Spark initialization and custom behavior.
9
10
```scala { .api }
11
/**
12
* A Spark-specific interactive shell extending Scala's ILoop
13
* Provides automatic Spark context/session creation and initialization
14
*/
15
class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) extends ILoop(in0, out) {
16
/**
17
* Alternative constructor with BufferedReader
18
* @param in0 Input reader for REPL commands
19
* @param out Output writer for REPL responses
20
*/
21
def this(in0: BufferedReader, out: JPrintWriter)
22
23
/**
24
* Default constructor using console I/O
25
*/
26
def this()
27
28
/**
29
* Initialize Spark context and session in the REPL environment
30
* Executes initialization commands to create 'spark' and 'sc' variables
31
* Imports common Spark APIs automatically
32
*/
33
def initializeSpark(): Unit
34
35
/**
36
* Main REPL processing loop
37
* Handles startup, interpreter creation, and command processing
38
* @param settings Scala compiler settings
39
* @return true if processing completed successfully
40
*/
41
def process(settings: Settings): Boolean
42
43
/**
44
* Create the Scala interpreter with Spark-specific customizations
45
* Uses SparkILoopInterpreter for Scala 2.11 compatibility
46
*/
47
override def createInterpreter(): Unit
48
49
/** Print Spark welcome message with version info */
50
override def printWelcome(): Unit
51
52
/** Available REPL commands (uses standard commands) */
53
override def commands: List[LoopCommand]
54
55
/**
56
* Handle :reset command
57
* Preserves SparkSession and SparkContext state after reset
58
* @param line Command line input
59
*/
60
override def resetCommand(line: String): Unit
61
62
/** Replay command history with Spark re-initialization */
63
override def replay(): Unit
64
}
65
```
66
67
**Usage Examples:**
68
69
```scala
70
import org.apache.spark.repl.SparkILoop
71
import java.io.{BufferedReader, StringReader, PrintWriter, StringWriter}
72
73
// Create REPL with custom I/O
74
val input = new BufferedReader(new StringReader("val data = sc.parallelize(1 to 10)\ndata.sum()"))
75
val output = new StringWriter()
76
val repl = new SparkILoop(input, new PrintWriter(output))
77
78
// Process with default settings
79
import scala.tools.nsc.Settings
80
val settings = new Settings
81
repl.process(settings)
82
83
// Access output
84
val result = output.toString
85
```
86
87
### SparkILoop Companion Object
88
89
Utility methods for running code in REPL instances programmatically.
90
91
```scala { .api }
92
object SparkILoop {
93
/**
94
* Creates an interpreter loop with default settings and feeds
95
* the given code to it as input
96
* @param code Scala code to execute
97
* @param sets Scala compiler settings (optional)
98
* @return String output from REPL execution
99
*/
100
def run(code: String, sets: Settings = new Settings): String
101
102
/**
103
* Run multiple lines of code in REPL
104
* @param lines List of code lines to execute
105
* @return String output from REPL execution
106
*/
107
def run(lines: List[String]): String
108
}
109
```
110
111
**Usage Examples:**
112
113
```scala
114
// Execute single code block
115
val result = SparkILoop.run("""
116
val rdd = sc.parallelize(1 to 100)
117
rdd.filter(_ % 2 == 0).count()
118
""")
119
120
// Execute multiple lines
121
val lines = List(
122
"val data = sc.parallelize(1 to 10)",
123
"val doubled = data.map(_ * 2)",
124
"doubled.collect()"
125
)
126
val output = SparkILoop.run(lines)
127
```
128
129
### Initialization Commands
130
131
Pre-defined commands executed during REPL startup to set up Spark environment.
132
133
```scala { .api }
134
/**
135
* Commands run automatically during REPL initialization
136
* Creates 'spark' and 'sc' variables and imports common APIs
137
*/
138
val initializationCommands: Seq[String]
139
```
140
141
The initialization commands include:
142
143
1. **SparkSession Creation**: Creates `spark` variable
144
2. **SparkContext Access**: Creates `sc` variable
145
3. **Standard Imports**: Imports SparkContext implicits, SQL functions, etc.
146
4. **UI Information**: Displays Spark UI URL
147
148
```scala
149
// Actual initialization commands:
150
"""
151
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
152
org.apache.spark.repl.Main.sparkSession
153
} else {
154
org.apache.spark.repl.Main.createSparkSession()
155
}
156
@transient val sc = {
157
val _sc = spark.sparkContext
158
// UI URL display logic
159
_sc
160
}
161
"""
162
"import org.apache.spark.SparkContext._"
163
"import spark.implicits._"
164
"import spark.sql"
165
"import org.apache.spark.sql.functions._"
166
```
167
168
## REPL Customizations
169
170
### Welcome Message
171
172
Custom Spark ASCII art welcome message with version information:
173
174
```
175
Welcome to
176
____ __
177
/ __/__ ___ _____/ /__
178
_\ \/ _ \/ _ `/ __/ '_/
179
/___/ .__/\_,_/_/ /_/\_\ version 2.4.8
180
/_/
181
182
Using Scala 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_275)
183
Type in expressions to have them evaluated.
184
Type :help for more information.
185
```
186
187
### Scala Version Compatibility
188
189
Special handling for Scala 2.11 compatibility issues:
190
191
- Uses `SparkILoopInterpreter` for Scala 2.11 to fix import handling bugs
192
- Manages context classloader correctly for thread safety
193
- Custom process method to ensure proper initialization order
194
195
### Command Processing
196
197
Enhanced command processing with Spark-specific features:
198
199
- **Reset Command**: Preserves Spark session state across resets
200
- **Replay Command**: Re-initializes Spark environment during replay
201
- **Help System**: Standard Scala REPL help with Spark context
202
203
## Error Handling
204
205
### Interpreter Errors
206
207
```scala
208
if (!intp.reporter.hasErrors) {
209
// Proceed with initialization
210
} else {
211
throw new RuntimeException(s"Scala $versionString interpreter encountered errors during initialization")
212
}
213
```
214
215
### Context Classloader Management
216
217
Special handling for Scala 2.11 classloader bugs:
218
219
```scala
220
private def runClosure(body: => Boolean): Boolean = {
221
if (isScala2_11) {
222
val original = Thread.currentThread().getContextClassLoader
223
try {
224
body
225
} finally {
226
Thread.currentThread().setContextClassLoader(original)
227
}
228
} else {
229
body
230
}
231
}
232
```
233
234
## Integration Features
235
236
### Auto-Import System
237
238
Automatic import of commonly used Spark APIs:
239
240
- `SparkContext._`: RDD operations and implicits
241
- `spark.implicits._`: Dataset/DataFrame encoders
242
- `spark.sql`: SQL interface access
243
- `org.apache.spark.sql.functions._`: SQL functions
244
245
### UI Integration
246
247
Automatic display of Spark UI information:
248
249
- Detects reverse proxy configuration
250
- Shows appropriate UI URLs based on deployment
251
- Displays master URL and application ID
252
253
### File Loading
254
255
Support for loading Scala files during startup:
256
257
- `:load` command support for script files
258
- `:paste` command support for code blocks
259
- Integration with Scala compiler settings