This document covers the core server functionality including lifecycle management, service configuration, and standalone deployment options.
Main gRPC service implementation that handles all client requests.
class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
def executePlan(request: proto.ExecutePlanRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit
def analyzePlan(request: proto.AnalyzePlanRequest, responseObserver: StreamObserver[proto.AnalyzePlanResponse]): Unit
def config(request: proto.ConfigRequest, responseObserver: StreamObserver[proto.ConfigResponse]): Unit
def addArtifacts(responseObserver: StreamObserver[proto.AddArtifactsResponse]): StreamObserver[proto.AddArtifactsRequest]
def artifactStatus(request: proto.ArtifactStatusesRequest, responseObserver: StreamObserver[proto.ArtifactStatusesResponse]): Unit
def interrupt(request: proto.InterruptRequest, responseObserver: StreamObserver[proto.InterruptResponse]): Unit
def reattachExecute(request: proto.ReattachExecuteRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit
def releaseExecute(request: proto.ReleaseExecuteRequest, responseObserver: StreamObserver[proto.ReleaseExecuteResponse]): Unit
}Server lifecycle management and session utilities.
object SparkConnectService {
def start(sc: SparkContext): Unit
def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit
def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder
def getIsolatedSession(userId: String, sessionId: String): SessionHolder
def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]
def server: Server
def uiTab: Option[SparkConnectServerTab]
}Standalone server application entry point.
object SparkConnectServer {
def main(args: Array[String]): Unit
}Simplified service for testing and development.
object SimpleSparkConnectService {
def main(args: Array[String]): Unit
}The server delegates request processing to specialized handler classes:
class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {
def handle(request: proto.ExecutePlanRequest): Unit
}class SparkConnectAnalyzeHandler(responseObserver: StreamObserver[proto.AnalyzePlanResponse]) {
def handle(request: proto.AnalyzePlanRequest): Unit
}class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigResponse]) {
def handle(request: proto.ConfigRequest): Unit
}class SparkConnectAddArtifactsHandler(responseObserver: StreamObserver[proto.AddArtifactsResponse]) {
def handle(request: proto.AddArtifactsRequest): Unit
}class SparkConnectArtifactStatusesHandler(responseObserver: StreamObserver[proto.ArtifactStatusesResponse]) {
def handle(request: proto.ArtifactStatusesRequest): Unit
}class SparkConnectInterruptHandler(responseObserver: StreamObserver[proto.InterruptResponse]) {
def handle(request: proto.InterruptRequest): Unit
}class SparkConnectReattachExecuteHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {
def handle(request: proto.ReattachExecuteRequest): Unit
}class SparkConnectReleaseExecuteHandler(responseObserver: StreamObserver[proto.ReleaseExecuteResponse]) {
def handle(request: proto.ReleaseExecuteRequest): Unit
}The server supports gRPC interceptors for cross-cutting concerns.
object SparkConnectInterceptorRegistry {
def chainInterceptors(sb: NettyServerBuilder): Unit
def createConfiguredInterceptors(): Seq[ServerInterceptor]
}class LoggingInterceptor extends ServerInterceptor
class LocalPropertiesCleanupInterceptor extends ServerInterceptorimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.service.SparkConnectService
// Create Spark session
val spark = SparkSession.builder()
.appName("MyApp")
.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
.getOrCreate()
// Start Connect server
SparkConnectService.start(spark.sparkContext)
// Server is now listening for client connections
println("Connect server started")
// Later, shutdown the server
SparkConnectService.stop(Some(30), Some(TimeUnit.SECONDS))import org.apache.spark.sql.connect.service.SparkConnectService
// Check active executions
SparkConnectService.listActiveExecutions match {
case Left(count) => println(s"Active executions: $count")
case Right(executions) =>
executions.foreach { exec =>
println(s"Execution ${exec.executeId}: ${exec.status}")
}
}
// Get server instance
val server = SparkConnectService.server
println(s"Server port: ${server.getPort}")
println(s"Server services: ${server.getServices.size()}")Server behavior is controlled through Spark configuration properties. See the Configuration documentation for details on available settings including:
All request handlers use centralized error handling through the ErrorUtils utility, which: