or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md
tile.json

session-management.mddocs/

Session Management

The session management system handles client sessions, execution state, event tracking, and resource isolation. Each client connection is associated with a session that maintains its own Spark session, configuration, and execution context.

Core Session Components

SessionHolder

Manages Spark session state and configuration for Connect clients.

class SessionHolder(
  userId: String, 
  sessionId: String, 
  session: SparkSession
) {
  def userId: String
  def sessionId: String
  def session: SparkSession
  def artifactManager: SparkConnectArtifactManager
  // Additional session management methods (internal)
}

Properties:

  • userId: Unique identifier for the user
  • sessionId: Unique identifier for the session
  • session: The underlying Spark session
  • artifactManager: Handles artifacts for this session

SparkConnectExecutionManager

Global tracker of all ExecuteHolder executions across all sessions.

class SparkConnectExecutionManager() {
  def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]
  def listAbandonedExecutions: Seq[ExecuteInfo]
}

Key Methods:

  • listActiveExecutions: Returns either the timestamp of last execution (if no active executions) or the list of all active executions
  • listAbandonedExecutions: Returns list of executions that were abandoned and removed by periodic maintenance

Note: This is a global manager accessed through SparkConnectService.executionManager. Individual execution creation and management is handled through internal methods not exposed in the public API.

ExecuteInfo

Information about an execution returned by the execution manager.

case class ExecuteInfo(
  request: proto.ExecutePlanRequest,
  userId: String,
  sessionId: String,
  operationId: String,
  jobTag: String,
  sparkSessionTags: Set[String],
  reattachable: Boolean,
  status: ExecuteStatus,
  creationTime: Long,
  lastAttachedRpcTime: Option[Long],
  closedTime: Option[Long]
)

Properties:

  • request: The original execution request
  • userId: User who initiated the execution
  • sessionId: Session containing the execution
  • operationId: Unique identifier for the operation
  • jobTag: Spark job tag for tracking
  • sparkSessionTags: Tags associated with the Spark session
  • reattachable: Whether execution supports reattachment
  • status: Current execution status
  • creationTime: When the execution was created (timestamp)
  • lastAttachedRpcTime: Last time RPC was attached (if any)
  • closedTime: When execution was closed (if closed)

ExecuteHolder

Holds execution state and manages the execution lifecycle.

class ExecuteHolder(
  executeId: String,
  request: proto.ExecutePlanRequest,
  sessionHolder: SessionHolder
) {
  def executeId: String
  def sessionHolder: SessionHolder
  def request: proto.ExecutePlanRequest
  def createdTime: Long
  def startTime: Option[Long]
  def status: ExecuteStatus
  // Additional execution state methods (internal)
}

Event Management

SessionEventsManager

Manages session-level events and monitoring.

class SessionEventsManager(sessionHolder: SessionHolder) {
  def recordSessionStart(): Unit
  def recordSessionEnd(): Unit
  def recordConfigChange(key: String, value: String): Unit
  def getSessionMetrics: SessionMetrics
}

ExecuteEventsManager

Manages execution-level events and state tracking.

class ExecuteEventsManager(executeHolder: ExecuteHolder) {
  def recordExecutionStart(): Unit
  def recordExecutionEnd(success: Boolean): Unit
  def recordError(error: Throwable): Unit
  def getExecutionMetrics: ExecutionMetrics
}

Streaming Query Management

SparkConnectStreamingQueryCache

Caches streaming queries for client access and management.

class SparkConnectStreamingQueryCache(sessionHolder: SessionHolder) {
  def registerQuery(queryId: String, query: StreamingQuery): Unit
  def getQuery(queryId: String): Option[StreamingQuery]
  def removeQuery(queryId: String): Option[StreamingQuery]
  def listActiveQueries: Seq[StreamingQuery]
  def stopQuery(queryId: String): Boolean
}

Session Access and Lifecycle

Session Creation and Access

Sessions are managed through the SparkConnectService companion object:

object SparkConnectService {
  def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder
  def getIsolatedSession(userId: String, sessionId: String): SessionHolder
  def removeSession(userId: String, sessionId: String): Option[SessionHolder]
}

Key Methods:

  • getOrCreateIsolatedSession: Get existing session or create new one
  • getIsolatedSession: Get existing session (returns None if not found)
  • removeSession: Clean up and remove session

Usage Examples

Creating and Managing Sessions

import org.apache.spark.sql.connect.service.SparkConnectService

// Create or get existing session
val sessionHolder = SparkConnectService.getOrCreateIsolatedSession(
  userId = "user123",
  sessionId = "session456"
)

// Access the underlying Spark session
val sparkSession = sessionHolder.session

// Configure the session
sparkSession.conf.set("spark.sql.adaptive.enabled", "true")
sparkSession.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Managing Executions

import org.apache.spark.sql.connect.service.SparkConnectExecutionManager
import org.apache.spark.connect.proto

// Create execution manager
val executionManager = new SparkConnectExecutionManager(sessionHolder)

// Create new execution
val request: proto.ExecutePlanRequest = // ... from client
val executeHolder = executionManager.createExecution(request)

// List active executions
val activeExecutions = executionManager.listActiveExecutions
println(s"Active executions: ${activeExecutions.length}")

// Interrupt execution if needed
val interrupted = executionManager.interruptExecution(executeHolder.executeId)

Streaming Query Management

import org.apache.spark.sql.connect.service.SparkConnectStreamingQueryCache
import org.apache.spark.sql.streaming.StreamingQuery

// Create streaming query cache
val queryCache = new SparkConnectStreamingQueryCache(sessionHolder)

// Register a streaming query
val query: StreamingQuery = // ... created streaming query
queryCache.registerQuery("query1", query)

// List active streaming queries
val activeQueries = queryCache.listActiveQueries
activeQueries.foreach { query =>
  println(s"Query ${query.id}: ${query.status}")
}

// Stop a specific query
val stopped = queryCache.stopQuery("query1")

Event Tracking

import org.apache.spark.sql.connect.service.{SessionEventsManager, ExecuteEventsManager}

// Track session events
val sessionEvents = new SessionEventsManager(sessionHolder)
sessionEvents.recordSessionStart()
sessionEvents.recordConfigChange("spark.sql.adaptive.enabled", "true")

// Track execution events
val executeEvents = new ExecuteEventsManager(executeHolder) 
executeEvents.recordExecutionStart()

// Later, after execution completes
executeEvents.recordExecutionEnd(success = true)

// Get metrics
val sessionMetrics = sessionEvents.getSessionMetrics
val executionMetrics = executeEvents.getExecutionMetrics

Session Isolation

Each session provides complete isolation from other sessions:

Resource Isolation

  • Spark Session: Each Connect session has its own SparkSession instance
  • Configuration: Session-specific Spark configuration settings
  • Artifacts: Isolated JAR files and class loaders per session
  • Temporary Views: Session-scoped temporary views and UDFs

State Isolation

  • Execution Context: Independent execution contexts and thread pools
  • Streaming Queries: Session-specific streaming query management
  • Metrics: Separate metrics collection per session
  • Error Handling: Session-scoped error reporting and logging

Concurrency and Thread Safety

The session management system handles concurrent access safely:

Thread Safety

  • All session operations are thread-safe
  • Execution state is protected with appropriate synchronization
  • Event recording is atomic and thread-safe
  • Query cache operations are synchronized

Concurrent Executions

  • Multiple executions can run simultaneously within a session
  • Execution resources are managed independently
  • Reattachable executions support fault tolerance
  • Streaming queries run concurrently with batch operations

Resource Management

Memory Management

  • Session-scoped memory limits and monitoring
  • Automatic cleanup of completed executions
  • Streaming query resource tracking
  • Artifact garbage collection

Cleanup and Lifecycle

// Sessions are automatically cleaned up when:
// 1. Client disconnects gracefully
// 2. Session timeout is reached
// 3. Explicit session termination
// 4. Server shutdown

// Manual cleanup example
val removed = SparkConnectService.removeSession(userId, sessionId)
removed.foreach { session =>
  session.session.stop()
  println(s"Session ${session.sessionId} cleaned up")
}

Configuration and Tuning

Session Configuration

Key configuration options for session management:

  • spark.connect.session.timeout: Session idle timeout
  • spark.connect.execution.maxConcurrent: Max concurrent executions per session
  • spark.connect.streaming.maxQueries: Max streaming queries per session
  • spark.connect.artifacts.maxSize: Max artifact cache size per session

Performance Tuning

  • Configure appropriate timeouts for long-running operations
  • Tune concurrent execution limits based on cluster resources
  • Monitor session metrics for resource usage patterns
  • Implement custom cleanup policies for inactive sessions

Error Handling and Recovery

Session Recovery

  • Sessions can be recovered after temporary disconnections
  • Reattachable executions provide fault tolerance
  • Streaming queries automatically recover from failures
  • Artifact state is persisted across reconnections

Error Reporting

  • Session-level errors are reported with context
  • Execution errors include session and user information
  • Structured error messages for client consumption
  • Detailed server-side logging for debugging