0
# Session Management
1
2
Session management in Spark REPL handles the creation, configuration, and lifecycle of SparkSession and SparkContext instances for interactive use.
3
4
## Core Session Management
5
6
### Main Entry Point
7
8
```scala { .api }
9
object Main extends Logging {
10
def main(args: Array[String]): Unit
11
def createSparkSession(): SparkSession
12
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit
13
}
14
```
15
16
### Configuration Properties
17
18
```scala { .api }
19
object Main {
20
val conf: SparkConf
21
val rootDir: String
22
val outputDir: File
23
var sparkContext: SparkContext
24
var sparkSession: SparkSession
25
var interp: SparkILoop
26
}
27
```
28
29
## Session Creation
30
31
### Basic Session Creation
32
33
```scala
34
import org.apache.spark.repl.Main
35
36
// Create a SparkSession configured for REPL use
37
val session = Main.createSparkSession()
38
39
// Access the associated SparkContext
40
val context = session.sparkContext
41
```
42
43
### Programmatic REPL Startup
44
45
```scala
46
import org.apache.spark.repl.{Main, SparkILoop}
47
import scala.tools.nsc.GenericRunnerSettings
48
49
// Create custom REPL instance
50
val repl = new SparkILoop()
51
val args = Array[String]() // Command line arguments
52
53
// Start REPL with custom configuration
54
Main.doMain(args, repl)
55
```
56
57
## Configuration
58
59
### Environment Variables
60
61
The REPL automatically detects and uses several environment variables:
62
63
```scala
64
// Environment variables automatically used:
65
// SPARK_HOME - Spark installation directory
66
// SPARK_EXECUTOR_URI - Custom executor URI
67
68
// Configuration properties automatically used:
69
// spark.repl.classdir - Custom class output directory (defaults to system temp)
70
// spark.repl.class.outputDir - Output directory for compiled classes
71
```
72
73
### Automatic Configuration
74
75
```scala
76
// Default configuration applied automatically:
77
conf.setIfMissing("spark.app.name", "Spark shell")
78
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
79
80
// Environment variable handling
81
if (System.getenv("SPARK_EXECUTOR_URI") != null) {
82
conf.set("spark.executor.uri", System.getenv("SPARK_EXECUTOR_URI"))
83
}
84
if (System.getenv("SPARK_HOME") != null) {
85
conf.setSparkHome(System.getenv("SPARK_HOME"))
86
}
87
88
// Conditional Hive support
89
val builder = SparkSession.builder.config(conf)
90
if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
91
if (SparkSession.hiveClassesArePresent) {
92
sparkSession = builder.enableHiveSupport().getOrCreate()
93
} else {
94
builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
95
sparkSession = builder.getOrCreate()
96
}
97
} else {
98
sparkSession = builder.getOrCreate()
99
}
100
```
101
102
## Session Access
103
104
### Accessing Active Session
105
106
```scala
107
// From within REPL or after initialization
108
val currentSession = Main.sparkSession
109
val currentContext = Main.sparkContext
110
111
// Check if session is available
112
if (Main.sparkSession != null) {
113
// Session is ready for use
114
val df = Main.sparkSession.read.json("data.json")
115
}
116
```
117
118
### Session Properties
119
120
```scala
121
// Access session configuration
122
val conf = Main.sparkSession.conf
123
val appName = conf.get("spark.app.name")
124
125
// Access SparkContext properties
126
val sc = Main.sparkContext
127
val masterId = sc.master
128
val appId = sc.applicationId
129
```
130
131
## Lifecycle Management
132
133
### Session Initialization
134
135
```scala
136
// Automatic initialization when starting REPL
137
Main.main(Array.empty) // Starts full REPL with session
138
139
// Manual session creation (programmatic use)
140
val session = Main.createSparkSession()
141
// Session is now available via Main.sparkSession
142
```
143
144
### Session Cleanup
145
146
```scala
147
// Automatic cleanup on REPL exit
148
// SparkContext.stop() is called automatically
149
150
// Manual cleanup (for programmatic use)
151
Option(Main.sparkContext).foreach(_.stop())
152
```
153
154
## Error Handling
155
156
### Initialization Errors
157
158
```scala
159
try {
160
val session = Main.createSparkSession()
161
} catch {
162
case e: ClassNotFoundException if e.getMessage.contains("org.apache.spark.sql.connect.SparkConnectPlugin") =>
163
// Handle missing Spark Connect plugin
164
logError("Failed to load spark connect plugin.")
165
logError("You need to build Spark with -Pconnect.")
166
sys.exit(1)
167
168
case e: Exception =>
169
// Handle other initialization failures
170
logError("Failed to initialize Spark session.", e)
171
sys.exit(1)
172
}
173
```
174
175
### Session State Validation
176
177
```scala
178
// Check if session is properly initialized
179
def isSessionReady: Boolean = {
180
Main.sparkSession != null &&
181
Main.sparkContext != null &&
182
!Main.sparkContext.isStopped
183
}
184
185
// Validate session health
186
def validateSession(): Unit = {
187
require(Main.sparkSession != null, "SparkSession not initialized")
188
require(!Main.sparkContext.isStopped, "SparkContext has been stopped")
189
}
190
```
191
192
## Web UI Integration
193
194
### Automatic UI Display
195
196
The REPL automatically displays Spark Web UI information on startup:
197
198
```scala
199
// Automatic output on session creation:
200
// "Spark context Web UI available at http://localhost:4040"
201
// "Spark context available as 'sc' (master = local[*], app id = app-20231201-000001)."
202
// "Spark session available as 'spark'."
203
204
// Handle reverse proxy configurations
205
val proxyUrl = sc.getConf.get("spark.ui.reverseProxyUrl", null)
206
if (proxyUrl != null) {
207
println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${sc.applicationId}")
208
}
209
```
210
211
## Advanced Configuration
212
213
### Custom Spark Configuration
214
215
```scala
216
import org.apache.spark.SparkConf
217
218
// Modify configuration before session creation
219
Main.conf.set("spark.executor.memory", "2g")
220
Main.conf.set("spark.executor.cores", "2")
221
Main.conf.set("spark.sql.adaptive.enabled", "true")
222
223
// Then create session with custom config
224
val session = Main.createSparkSession()
225
```
226
227
### JAR Management
228
229
```scala
230
// User JARs are automatically detected and processed
231
val jars = Utils.getLocalUserJarsForShell(conf)
232
// Remove file:///, file:// or file:/ scheme if exists for each jar
233
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
234
.mkString(File.pathSeparator)
235
236
// JARs are included in interpreter classpath
237
val interpArguments = List(
238
"-Yrepl-class-based",
239
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
240
"-classpath", jars
241
)
242
```
243
244
## Testing Support
245
246
### Test-Friendly Access
247
248
```scala
249
// Package-private method for testing
250
class MyTest {
251
def testREPL(): Unit = {
252
val mockInterp = new SparkILoop()
253
val args = Array("--master", "local[1]")
254
255
// Use test-visible doMain method
256
Main.doMain(args, mockInterp)
257
258
// Verify session state
259
assert(Main.sparkSession != null)
260
assert(Main.sparkContext != null)
261
}
262
}
263
```