0
# Session Management
1
2
The session management system handles client sessions, execution state, event tracking, and resource isolation. Each client connection is associated with a session that maintains its own Spark session, configuration, and execution context.
3
4
## Core Session Components
5
6
### SessionHolder
7
8
Manages Spark session state and configuration for Connect clients.
9
10
```scala { .api }
11
class SessionHolder(
12
userId: String,
13
sessionId: String,
14
session: SparkSession
15
) {
16
def userId: String
17
def sessionId: String
18
def session: SparkSession
19
def artifactManager: SparkConnectArtifactManager
20
// Additional session management methods (internal)
21
}
22
```
23
24
**Properties:**
25
- `userId`: Unique identifier for the user
26
- `sessionId`: Unique identifier for the session
27
- `session`: The underlying Spark session
28
- `artifactManager`: Handles artifacts for this session
29
30
### SparkConnectExecutionManager
31
32
Global tracker of all ExecuteHolder executions across all sessions.
33
34
```scala { .api }
35
class SparkConnectExecutionManager() {
36
def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]
37
def listAbandonedExecutions: Seq[ExecuteInfo]
38
}
39
```
40
41
**Key Methods:**
42
- `listActiveExecutions`: Returns either the timestamp of last execution (if no active executions) or the list of all active executions
43
- `listAbandonedExecutions`: Returns list of executions that were abandoned and removed by periodic maintenance
44
45
**Note:** This is a global manager accessed through `SparkConnectService.executionManager`. Individual execution creation and management is handled through internal methods not exposed in the public API.
46
47
### ExecuteInfo
48
49
Information about an execution returned by the execution manager.
50
51
```scala { .api }
52
case class ExecuteInfo(
53
request: proto.ExecutePlanRequest,
54
userId: String,
55
sessionId: String,
56
operationId: String,
57
jobTag: String,
58
sparkSessionTags: Set[String],
59
reattachable: Boolean,
60
status: ExecuteStatus,
61
creationTime: Long,
62
lastAttachedRpcTime: Option[Long],
63
closedTime: Option[Long]
64
)
65
```
66
67
**Properties:**
68
- `request`: The original execution request
69
- `userId`: User who initiated the execution
70
- `sessionId`: Session containing the execution
71
- `operationId`: Unique identifier for the operation
72
- `jobTag`: Spark job tag for tracking
73
- `sparkSessionTags`: Tags associated with the Spark session
74
- `reattachable`: Whether execution supports reattachment
75
- `status`: Current execution status
76
- `creationTime`: When the execution was created (timestamp)
77
- `lastAttachedRpcTime`: Last time RPC was attached (if any)
78
- `closedTime`: When execution was closed (if closed)
79
80
### ExecuteHolder
81
82
Holds execution state and manages the execution lifecycle.
83
84
```scala { .api }
85
class ExecuteHolder(
86
executeId: String,
87
request: proto.ExecutePlanRequest,
88
sessionHolder: SessionHolder
89
) {
90
def executeId: String
91
def sessionHolder: SessionHolder
92
def request: proto.ExecutePlanRequest
93
def createdTime: Long
94
def startTime: Option[Long]
95
def status: ExecuteStatus
96
// Additional execution state methods (internal)
97
}
98
```
99
100
## Event Management
101
102
### SessionEventsManager
103
104
Manages session-level events and monitoring.
105
106
```scala { .api }
107
class SessionEventsManager(sessionHolder: SessionHolder) {
108
def recordSessionStart(): Unit
109
def recordSessionEnd(): Unit
110
def recordConfigChange(key: String, value: String): Unit
111
def getSessionMetrics: SessionMetrics
112
}
113
```
114
115
### ExecuteEventsManager
116
117
Manages execution-level events and state tracking.
118
119
```scala { .api }
120
class ExecuteEventsManager(executeHolder: ExecuteHolder) {
121
def recordExecutionStart(): Unit
122
def recordExecutionEnd(success: Boolean): Unit
123
def recordError(error: Throwable): Unit
124
def getExecutionMetrics: ExecutionMetrics
125
}
126
```
127
128
## Streaming Query Management
129
130
### SparkConnectStreamingQueryCache
131
132
Caches streaming queries for client access and management.
133
134
```scala { .api }
135
class SparkConnectStreamingQueryCache(sessionHolder: SessionHolder) {
136
def registerQuery(queryId: String, query: StreamingQuery): Unit
137
def getQuery(queryId: String): Option[StreamingQuery]
138
def removeQuery(queryId: String): Option[StreamingQuery]
139
def listActiveQueries: Seq[StreamingQuery]
140
def stopQuery(queryId: String): Boolean
141
}
142
```
143
144
## Session Access and Lifecycle
145
146
### Session Creation and Access
147
148
Sessions are managed through the SparkConnectService companion object:
149
150
```scala { .api }
151
object SparkConnectService {
152
def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder
153
def getIsolatedSession(userId: String, sessionId: String): SessionHolder
154
def removeSession(userId: String, sessionId: String): Option[SessionHolder]
155
}
156
```
157
158
**Key Methods:**
159
- `getOrCreateIsolatedSession`: Get existing session or create new one
160
- `getIsolatedSession`: Get existing session (returns None if not found)
161
- `removeSession`: Clean up and remove session
162
163
## Usage Examples
164
165
### Creating and Managing Sessions
166
167
```scala
168
import org.apache.spark.sql.connect.service.SparkConnectService
169
170
// Create or get existing session
171
val sessionHolder = SparkConnectService.getOrCreateIsolatedSession(
172
userId = "user123",
173
sessionId = "session456"
174
)
175
176
// Access the underlying Spark session
177
val sparkSession = sessionHolder.session
178
179
// Configure the session
180
sparkSession.conf.set("spark.sql.adaptive.enabled", "true")
181
sparkSession.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
182
```
183
184
### Managing Executions
185
186
```scala
187
import org.apache.spark.sql.connect.service.SparkConnectExecutionManager
188
import org.apache.spark.connect.proto
189
190
// Create execution manager
191
val executionManager = new SparkConnectExecutionManager(sessionHolder)
192
193
// Create new execution
194
val request: proto.ExecutePlanRequest = // ... from client
195
val executeHolder = executionManager.createExecution(request)
196
197
// List active executions
198
val activeExecutions = executionManager.listActiveExecutions
199
println(s"Active executions: ${activeExecutions.length}")
200
201
// Interrupt execution if needed
202
val interrupted = executionManager.interruptExecution(executeHolder.executeId)
203
```
204
205
### Streaming Query Management
206
207
```scala
208
import org.apache.spark.sql.connect.service.SparkConnectStreamingQueryCache
209
import org.apache.spark.sql.streaming.StreamingQuery
210
211
// Create streaming query cache
212
val queryCache = new SparkConnectStreamingQueryCache(sessionHolder)
213
214
// Register a streaming query
215
val query: StreamingQuery = // ... created streaming query
216
queryCache.registerQuery("query1", query)
217
218
// List active streaming queries
219
val activeQueries = queryCache.listActiveQueries
220
activeQueries.foreach { query =>
221
println(s"Query ${query.id}: ${query.status}")
222
}
223
224
// Stop a specific query
225
val stopped = queryCache.stopQuery("query1")
226
```
227
228
### Event Tracking
229
230
```scala
231
import org.apache.spark.sql.connect.service.{SessionEventsManager, ExecuteEventsManager}
232
233
// Track session events
234
val sessionEvents = new SessionEventsManager(sessionHolder)
235
sessionEvents.recordSessionStart()
236
sessionEvents.recordConfigChange("spark.sql.adaptive.enabled", "true")
237
238
// Track execution events
239
val executeEvents = new ExecuteEventsManager(executeHolder)
240
executeEvents.recordExecutionStart()
241
242
// Later, after execution completes
243
executeEvents.recordExecutionEnd(success = true)
244
245
// Get metrics
246
val sessionMetrics = sessionEvents.getSessionMetrics
247
val executionMetrics = executeEvents.getExecutionMetrics
248
```
249
250
## Session Isolation
251
252
Each session provides complete isolation from other sessions:
253
254
### Resource Isolation
255
256
- **Spark Session**: Each Connect session has its own SparkSession instance
257
- **Configuration**: Session-specific Spark configuration settings
258
- **Artifacts**: Isolated JAR files and class loaders per session
259
- **Temporary Views**: Session-scoped temporary views and UDFs
260
261
### State Isolation
262
263
- **Execution Context**: Independent execution contexts and thread pools
264
- **Streaming Queries**: Session-specific streaming query management
265
- **Metrics**: Separate metrics collection per session
266
- **Error Handling**: Session-scoped error reporting and logging
267
268
## Concurrency and Thread Safety
269
270
The session management system handles concurrent access safely:
271
272
### Thread Safety
273
274
- All session operations are thread-safe
275
- Execution state is protected with appropriate synchronization
276
- Event recording is atomic and thread-safe
277
- Query cache operations are synchronized
278
279
### Concurrent Executions
280
281
- Multiple executions can run simultaneously within a session
282
- Execution resources are managed independently
283
- Reattachable executions support fault tolerance
284
- Streaming queries run concurrently with batch operations
285
286
## Resource Management
287
288
### Memory Management
289
290
- Session-scoped memory limits and monitoring
291
- Automatic cleanup of completed executions
292
- Streaming query resource tracking
293
- Artifact garbage collection
294
295
### Cleanup and Lifecycle
296
297
```scala
298
// Sessions are automatically cleaned up when:
299
// 1. Client disconnects gracefully
300
// 2. Session timeout is reached
301
// 3. Explicit session termination
302
// 4. Server shutdown
303
304
// Manual cleanup example
305
val removed = SparkConnectService.removeSession(userId, sessionId)
306
removed.foreach { session =>
307
session.session.stop()
308
println(s"Session ${session.sessionId} cleaned up")
309
}
310
```
311
312
## Configuration and Tuning
313
314
### Session Configuration
315
316
Key configuration options for session management:
317
318
- `spark.connect.session.timeout`: Session idle timeout
319
- `spark.connect.execution.maxConcurrent`: Max concurrent executions per session
320
- `spark.connect.streaming.maxQueries`: Max streaming queries per session
321
- `spark.connect.artifacts.maxSize`: Max artifact cache size per session
322
323
### Performance Tuning
324
325
- Configure appropriate timeouts for long-running operations
326
- Tune concurrent execution limits based on cluster resources
327
- Monitor session metrics for resource usage patterns
328
- Implement custom cleanup policies for inactive sessions
329
330
## Error Handling and Recovery
331
332
### Session Recovery
333
334
- Sessions can be recovered after temporary disconnections
335
- Reattachable executions provide fault tolerance
336
- Streaming queries automatically recover from failures
337
- Artifact state is persisted across reconnections
338
339
### Error Reporting
340
341
- Session-level errors are reported with context
342
- Execution errors include session and user information
343
- Structured error messages for client consumption
344
- Detailed server-side logging for debugging