The Spark Connect Server provides comprehensive web-based monitoring and debugging capabilities through integration with Spark's web UI system. This includes server status, session monitoring, execution tracking, and performance metrics.
Main web UI tab for Connect server monitoring.
class SparkConnectServerTab(
store: SparkConnectServerAppStatusStore,
sparkUI: SparkUI
) extends SparkUITab(sparkUI, "connect") {
def detach(): Unit
def displayOrder: Int
}Key Methods:
detach(): Remove the tab from the Spark UIdisplayOrder: Determines tab ordering in the UIMain server monitoring page showing overall server status.
class SparkConnectServerPage(
parent: SparkConnectServerTab,
store: SparkConnectServerAppStatusStore
) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node]
}Session-specific monitoring page with detailed session information.
class SparkConnectServerSessionPage(
parent: SparkConnectServerTab,
store: SparkConnectServerAppStatusStore
) extends WebUIPage("session") {
def render(request: HttpServletRequest): Seq[Node]
}Event listener that collects data for the web UI.
class SparkConnectServerListener(
store: SparkConnectServerAppStatusStore,
sparkConf: SparkConf
) extends SparkListener {
def onSessionStart(event: SparkConnectSessionStartEvent): Unit
def onSessionEnd(event: SparkConnectSessionEndEvent): Unit
def onExecutionStart(event: SparkConnectExecutionStartEvent): Unit
def onExecutionEnd(event: SparkConnectExecutionEndEvent): Unit
}Event Handlers:
onSessionStart: Record new session creationonSessionEnd: Record session terminationonExecutionStart: Track new execution startonExecutionEnd: Record execution completionData store for UI information with configurable retention policies.
class SparkConnectServerAppStatusStore(
sparkConf: SparkConf,
store: ElementTrackingStore
) {
def getSessionInfo(sessionId: String): Option[SessionInfo]
def getAllSessions: Seq[SessionInfo]
def getExecutionInfo(executeId: String): Option[ExecutionInfo]
def getActiveExecutions: Seq[ExecutionInfo]
def getServerMetrics: ServerMetrics
}Key Methods:
getSessionInfo: Get detailed information about a specific sessiongetAllSessions: Get information about all sessionsgetExecutionInfo: Get details about a specific executiongetActiveExecutions: Get all currently running executionsgetServerMetrics: Get overall server performance metricsPlugin for Spark History Server integration.
class SparkConnectServerHistoryServerPlugin extends AppHistoryServerPlugin {
def createApplicationInfo(info: ApplicationInfo): ConnectApplicationInfo
def setupUI(ui: ApplicationHistoryUI): Unit
}case class SessionInfo(
sessionId: String,
userId: String,
startTime: Long,
endTime: Option[Long],
executionCount: Int,
artifactCount: Int,
status: SessionStatus
)
sealed trait SessionStatus
case object SessionActive extends SessionStatus
case object SessionIdle extends SessionStatus
case object SessionClosed extends SessionStatuscase class ExecutionInfo(
executeId: String,
sessionId: String,
userId: String,
startTime: Long,
endTime: Option[Long],
status: ExecutionStatus,
planType: String,
metrics: ExecutionMetrics
)
sealed trait ExecutionStatus
case object ExecutionRunning extends ExecutionStatus
case object ExecutionCompleted extends ExecutionStatus
case object ExecutionFailed extends ExecutionStatus
case object ExecutionCancelled extends ExecutionStatuscase class ServerMetrics(
uptime: Long,
totalSessions: Long,
activeSessions: Int,
totalExecutions: Long,
activeExecutions: Int,
totalArtifacts: Long,
memoryUsage: MemoryUsage,
requestRate: Double
)
case class MemoryUsage(
used: Long,
committed: Long,
max: Long
)import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.ui.{SparkConnectServerTab, SparkConnectServerListener, SparkConnectServerAppStatusStore}
// Create Spark session with UI enabled
val spark = SparkSession.builder()
.appName("MyConnectApp")
.config("spark.ui.enabled", "true")
.config("spark.ui.port", "4040")
.getOrCreate()
// Set up Connect server UI components
val store = new SparkConnectServerAppStatusStore(spark.conf, elementStore)
val listener = new SparkConnectServerListener(store, spark.conf)
val tab = new SparkConnectServerTab(spark.sparkContext, store, "Connect Server")
// Register listener to collect events
spark.sparkContext.addSparkListener(listener)
// UI is now available at http://localhost:4040/connect/import org.apache.spark.sql.connect.ui.SparkConnectServerAppStatusStore
// Get server metrics
val metrics = store.getServerMetrics
println(s"Server uptime: ${metrics.uptime / 1000} seconds")
println(s"Active sessions: ${metrics.activeSessions}")
println(s"Active executions: ${metrics.activeExecutions}")
println(s"Memory usage: ${metrics.memoryUsage.used / 1024 / 1024} MB")// Get all sessions
val allSessions = store.getAllSessions
allSessions.foreach { session =>
println(s"Session ${session.sessionId} (${session.userId}): ${session.status}")
println(s" Started: ${new Date(session.startTime)}")
println(s" Executions: ${session.executionCount}")
println(s" Artifacts: ${session.artifactCount}")
}
// Get specific session details
val sessionInfo = store.getSessionInfo("session123")
sessionInfo.foreach { info =>
println(s"Session details: $info")
}// Get active executions
val activeExecutions = store.getActiveExecutions
println(s"Currently running ${activeExecutions.length} executions")
activeExecutions.foreach { execution =>
val duration = System.currentTimeMillis() - execution.startTime
println(s"Execution ${execution.executeId}: ${execution.planType} (${duration}ms)")
}
// Get execution history
val executionInfo = store.getExecutionInfo("exec456")
executionInfo.foreach { info =>
println(s"Execution completed in ${info.endTime.get - info.startTime}ms")
println(s"Status: ${info.status}")
println(s"Metrics: ${info.metrics}")
}The main server page displays:
Individual session pages show:
Execution detail pages include:
Key configuration parameters for monitoring:
// UI enablement and port
spark.ui.enabled=true
spark.ui.port=4040
// Connect-specific UI settings
spark.connect.ui.enabled=true
spark.connect.ui.retainedSessions=200
spark.connect.ui.retainedExecutions=1000
spark.connect.ui.retainedQueries=100
// History retention
spark.connect.ui.session.timeout=7d
spark.connect.ui.execution.timeout=1d// Event collection settings
spark.connect.ui.listener.enabled=true
spark.connect.ui.metrics.collection.interval=10s
spark.connect.ui.metrics.retention.period=24h
// Performance monitoring
spark.connect.ui.monitoring.detailed=true
spark.connect.ui.monitoring.query.plans=trueimport org.apache.spark.scheduler.SparkListener
import org.apache.spark.sql.connect.ui.SparkConnectServerAppStatusStore
class CustomConnectListener(store: SparkConnectServerAppStatusStore) extends SparkListener {
override def onSessionStart(event: SparkConnectSessionStartEvent): Unit = {
// Custom session start handling
logInfo(s"New Connect session: ${event.sessionId}")
// Update custom metrics
incrementSessionCounter()
// Send to external monitoring system
sendToMetricsSystem(event)
}
override def onExecutionEnd(event: SparkConnectExecutionEndEvent): Unit = {
// Track execution patterns
analyzeExecutionPattern(event)
// Update performance metrics
updatePerformanceStats(event)
}
}// Integration with external monitoring systems
class ExternalMetricsReporter(store: SparkConnectServerAppStatusStore) {
def reportMetrics(): Unit = {
val metrics = store.getServerMetrics
// Send to Prometheus
prometheusRegistry.gauge("spark_connect_active_sessions").set(metrics.activeSessions)
prometheusRegistry.gauge("spark_connect_active_executions").set(metrics.activeExecutions)
// Send to DataDog
statsd.gauge("spark.connect.memory.used", metrics.memoryUsage.used)
statsd.gauge("spark.connect.request.rate", metrics.requestRate)
}
}spark.ui.enabled=true and port is accessibleThe UI provides detailed debug information for:
The monitoring system integrates with Spark's logging:
// Log levels for debugging
spark.sql.connect.ui.logLevel=DEBUG
spark.sql.connect.listener.logLevel=INFO
// Custom log appenders for UI events
spark.sql.connect.ui.logAppender=UIEventAppender