or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md
tile.json

server-management.mddocs/

Server Management

This document covers the core server functionality including lifecycle management, service configuration, and standalone deployment options.

Core Server Classes

SparkConnectService

Main gRPC service implementation that handles all client requests.

class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
  def executePlan(request: proto.ExecutePlanRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit
  def analyzePlan(request: proto.AnalyzePlanRequest, responseObserver: StreamObserver[proto.AnalyzePlanResponse]): Unit
  def config(request: proto.ConfigRequest, responseObserver: StreamObserver[proto.ConfigResponse]): Unit
  def addArtifacts(responseObserver: StreamObserver[proto.AddArtifactsResponse]): StreamObserver[proto.AddArtifactsRequest]
  def artifactStatus(request: proto.ArtifactStatusesRequest, responseObserver: StreamObserver[proto.ArtifactStatusesResponse]): Unit
  def interrupt(request: proto.InterruptRequest, responseObserver: StreamObserver[proto.InterruptResponse]): Unit
  def reattachExecute(request: proto.ReattachExecuteRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit
  def releaseExecute(request: proto.ReleaseExecuteRequest, responseObserver: StreamObserver[proto.ReleaseExecuteResponse]): Unit
}

SparkConnectService Companion Object

Server lifecycle management and session utilities.

object SparkConnectService {
  def start(sc: SparkContext): Unit
  def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit
  def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder
  def getIsolatedSession(userId: String, sessionId: String): SessionHolder
  def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]
  def server: Server
  def uiTab: Option[SparkConnectServerTab]
}

SparkConnectServer

Standalone server application entry point.

object SparkConnectServer {
  def main(args: Array[String]): Unit
}

SimpleSparkConnectService

Simplified service for testing and development.

object SimpleSparkConnectService {
  def main(args: Array[String]): Unit
}

Request Handlers

The server delegates request processing to specialized handler classes:

SparkConnectExecutePlanHandler

class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {
  def handle(request: proto.ExecutePlanRequest): Unit
}

SparkConnectAnalyzeHandler

class SparkConnectAnalyzeHandler(responseObserver: StreamObserver[proto.AnalyzePlanResponse]) {
  def handle(request: proto.AnalyzePlanRequest): Unit
}

SparkConnectConfigHandler

class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigResponse]) {
  def handle(request: proto.ConfigRequest): Unit
}

SparkConnectAddArtifactsHandler

class SparkConnectAddArtifactsHandler(responseObserver: StreamObserver[proto.AddArtifactsResponse]) {
  def handle(request: proto.AddArtifactsRequest): Unit
}

SparkConnectArtifactStatusesHandler

class SparkConnectArtifactStatusesHandler(responseObserver: StreamObserver[proto.ArtifactStatusesResponse]) {
  def handle(request: proto.ArtifactStatusesRequest): Unit
}

SparkConnectInterruptHandler

class SparkConnectInterruptHandler(responseObserver: StreamObserver[proto.InterruptResponse]) {
  def handle(request: proto.InterruptRequest): Unit
}

SparkConnectReattachExecuteHandler

class SparkConnectReattachExecuteHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {
  def handle(request: proto.ReattachExecuteRequest): Unit
}

SparkConnectReleaseExecuteHandler

class SparkConnectReleaseExecuteHandler(responseObserver: StreamObserver[proto.ReleaseExecuteResponse]) {
  def handle(request: proto.ReleaseExecuteRequest): Unit
}

Interceptor System

The server supports gRPC interceptors for cross-cutting concerns.

SparkConnectInterceptorRegistry

object SparkConnectInterceptorRegistry {
  def chainInterceptors(sb: NettyServerBuilder): Unit
  def createConfiguredInterceptors(): Seq[ServerInterceptor]
}

Built-in Interceptors

class LoggingInterceptor extends ServerInterceptor

class LocalPropertiesCleanupInterceptor extends ServerInterceptor

Usage Examples

Embedded Server

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.service.SparkConnectService

// Create Spark session
val spark = SparkSession.builder()
  .appName("MyApp")
  .config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
  .getOrCreate()

// Start Connect server
SparkConnectService.start(spark.sparkContext)

// Server is now listening for client connections
println("Connect server started")

// Later, shutdown the server
SparkConnectService.stop(Some(30), Some(TimeUnit.SECONDS))

Server Status and Monitoring

import org.apache.spark.sql.connect.service.SparkConnectService

// Check active executions
SparkConnectService.listActiveExecutions match {
  case Left(count) => println(s"Active executions: $count")
  case Right(executions) => 
    executions.foreach { exec =>
      println(s"Execution ${exec.executeId}: ${exec.status}")
    }
}

// Get server instance
val server = SparkConnectService.server
println(s"Server port: ${server.getPort}")
println(s"Server services: ${server.getServices.size()}")

Configuration

Server behavior is controlled through Spark configuration properties. See the Configuration documentation for details on available settings including:

  • Binding port and network settings
  • Message size limits and timeouts
  • Plugin class configuration
  • UI and monitoring settings
  • Security and authentication options

Error Handling

All request handlers use centralized error handling through the ErrorUtils utility, which:

  • Converts Spark exceptions to appropriate gRPC status codes
  • Sanitizes error messages for client consumption
  • Logs detailed error information for debugging
  • Provides structured error responses with user and session context