The session management system handles client sessions, execution state, event tracking, and resource isolation. Each client connection is associated with a session that maintains its own Spark session, configuration, and execution context.
Manages Spark session state and configuration for Connect clients.
class SessionHolder(
userId: String,
sessionId: String,
session: SparkSession
) {
def userId: String
def sessionId: String
def session: SparkSession
def artifactManager: SparkConnectArtifactManager
// Additional session management methods (internal)
}Properties:
userId: Unique identifier for the usersessionId: Unique identifier for the sessionsession: The underlying Spark sessionartifactManager: Handles artifacts for this sessionGlobal tracker of all ExecuteHolder executions across all sessions.
class SparkConnectExecutionManager() {
def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]
def listAbandonedExecutions: Seq[ExecuteInfo]
}Key Methods:
listActiveExecutions: Returns either the timestamp of last execution (if no active executions) or the list of all active executionslistAbandonedExecutions: Returns list of executions that were abandoned and removed by periodic maintenanceNote: This is a global manager accessed through SparkConnectService.executionManager. Individual execution creation and management is handled through internal methods not exposed in the public API.
Information about an execution returned by the execution manager.
case class ExecuteInfo(
request: proto.ExecutePlanRequest,
userId: String,
sessionId: String,
operationId: String,
jobTag: String,
sparkSessionTags: Set[String],
reattachable: Boolean,
status: ExecuteStatus,
creationTime: Long,
lastAttachedRpcTime: Option[Long],
closedTime: Option[Long]
)Properties:
request: The original execution requestuserId: User who initiated the executionsessionId: Session containing the executionoperationId: Unique identifier for the operationjobTag: Spark job tag for trackingsparkSessionTags: Tags associated with the Spark sessionreattachable: Whether execution supports reattachmentstatus: Current execution statuscreationTime: When the execution was created (timestamp)lastAttachedRpcTime: Last time RPC was attached (if any)closedTime: When execution was closed (if closed)Holds execution state and manages the execution lifecycle.
class ExecuteHolder(
executeId: String,
request: proto.ExecutePlanRequest,
sessionHolder: SessionHolder
) {
def executeId: String
def sessionHolder: SessionHolder
def request: proto.ExecutePlanRequest
def createdTime: Long
def startTime: Option[Long]
def status: ExecuteStatus
// Additional execution state methods (internal)
}Manages session-level events and monitoring.
class SessionEventsManager(sessionHolder: SessionHolder) {
def recordSessionStart(): Unit
def recordSessionEnd(): Unit
def recordConfigChange(key: String, value: String): Unit
def getSessionMetrics: SessionMetrics
}Manages execution-level events and state tracking.
class ExecuteEventsManager(executeHolder: ExecuteHolder) {
def recordExecutionStart(): Unit
def recordExecutionEnd(success: Boolean): Unit
def recordError(error: Throwable): Unit
def getExecutionMetrics: ExecutionMetrics
}Caches streaming queries for client access and management.
class SparkConnectStreamingQueryCache(sessionHolder: SessionHolder) {
def registerQuery(queryId: String, query: StreamingQuery): Unit
def getQuery(queryId: String): Option[StreamingQuery]
def removeQuery(queryId: String): Option[StreamingQuery]
def listActiveQueries: Seq[StreamingQuery]
def stopQuery(queryId: String): Boolean
}Sessions are managed through the SparkConnectService companion object:
object SparkConnectService {
def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder
def getIsolatedSession(userId: String, sessionId: String): SessionHolder
def removeSession(userId: String, sessionId: String): Option[SessionHolder]
}Key Methods:
getOrCreateIsolatedSession: Get existing session or create new onegetIsolatedSession: Get existing session (returns None if not found)removeSession: Clean up and remove sessionimport org.apache.spark.sql.connect.service.SparkConnectService
// Create or get existing session
val sessionHolder = SparkConnectService.getOrCreateIsolatedSession(
userId = "user123",
sessionId = "session456"
)
// Access the underlying Spark session
val sparkSession = sessionHolder.session
// Configure the session
sparkSession.conf.set("spark.sql.adaptive.enabled", "true")
sparkSession.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")import org.apache.spark.sql.connect.service.SparkConnectExecutionManager
import org.apache.spark.connect.proto
// Create execution manager
val executionManager = new SparkConnectExecutionManager(sessionHolder)
// Create new execution
val request: proto.ExecutePlanRequest = // ... from client
val executeHolder = executionManager.createExecution(request)
// List active executions
val activeExecutions = executionManager.listActiveExecutions
println(s"Active executions: ${activeExecutions.length}")
// Interrupt execution if needed
val interrupted = executionManager.interruptExecution(executeHolder.executeId)import org.apache.spark.sql.connect.service.SparkConnectStreamingQueryCache
import org.apache.spark.sql.streaming.StreamingQuery
// Create streaming query cache
val queryCache = new SparkConnectStreamingQueryCache(sessionHolder)
// Register a streaming query
val query: StreamingQuery = // ... created streaming query
queryCache.registerQuery("query1", query)
// List active streaming queries
val activeQueries = queryCache.listActiveQueries
activeQueries.foreach { query =>
println(s"Query ${query.id}: ${query.status}")
}
// Stop a specific query
val stopped = queryCache.stopQuery("query1")import org.apache.spark.sql.connect.service.{SessionEventsManager, ExecuteEventsManager}
// Track session events
val sessionEvents = new SessionEventsManager(sessionHolder)
sessionEvents.recordSessionStart()
sessionEvents.recordConfigChange("spark.sql.adaptive.enabled", "true")
// Track execution events
val executeEvents = new ExecuteEventsManager(executeHolder)
executeEvents.recordExecutionStart()
// Later, after execution completes
executeEvents.recordExecutionEnd(success = true)
// Get metrics
val sessionMetrics = sessionEvents.getSessionMetrics
val executionMetrics = executeEvents.getExecutionMetricsEach session provides complete isolation from other sessions:
The session management system handles concurrent access safely:
// Sessions are automatically cleaned up when:
// 1. Client disconnects gracefully
// 2. Session timeout is reached
// 3. Explicit session termination
// 4. Server shutdown
// Manual cleanup example
val removed = SparkConnectService.removeSession(userId, sessionId)
removed.foreach { session =>
session.session.stop()
println(s"Session ${session.sessionId} cleaned up")
}Key configuration options for session management:
spark.connect.session.timeout: Session idle timeoutspark.connect.execution.maxConcurrent: Max concurrent executions per sessionspark.connect.streaming.maxQueries: Max streaming queries per sessionspark.connect.artifacts.maxSize: Max artifact cache size per session