A decoupled client-server architecture component for Apache Spark that enables remote connectivity to Spark clusters using the DataFrame API and gRPC protocol.
This document covers the core server functionality including lifecycle management, service configuration, and standalone deployment options.
Main gRPC service implementation that handles all client requests.
class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
def executePlan(request: proto.ExecutePlanRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit
def analyzePlan(request: proto.AnalyzePlanRequest, responseObserver: StreamObserver[proto.AnalyzePlanResponse]): Unit
def config(request: proto.ConfigRequest, responseObserver: StreamObserver[proto.ConfigResponse]): Unit
def addArtifacts(responseObserver: StreamObserver[proto.AddArtifactsResponse]): StreamObserver[proto.AddArtifactsRequest]
def artifactStatus(request: proto.ArtifactStatusesRequest, responseObserver: StreamObserver[proto.ArtifactStatusesResponse]): Unit
def interrupt(request: proto.InterruptRequest, responseObserver: StreamObserver[proto.InterruptResponse]): Unit
def reattachExecute(request: proto.ReattachExecuteRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit
def releaseExecute(request: proto.ReleaseExecuteRequest, responseObserver: StreamObserver[proto.ReleaseExecuteResponse]): Unit
}Server lifecycle management and session utilities.
object SparkConnectService {
def start(sc: SparkContext): Unit
def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit
def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder
def getIsolatedSession(userId: String, sessionId: String): SessionHolder
def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]
def server: Server
def uiTab: Option[SparkConnectServerTab]
}Standalone server application entry point.
object SparkConnectServer {
def main(args: Array[String]): Unit
}Simplified service for testing and development.
object SimpleSparkConnectService {
def main(args: Array[String]): Unit
}The server delegates request processing to specialized handler classes:
class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {
def handle(request: proto.ExecutePlanRequest): Unit
}class SparkConnectAnalyzeHandler(responseObserver: StreamObserver[proto.AnalyzePlanResponse]) {
def handle(request: proto.AnalyzePlanRequest): Unit
}class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigResponse]) {
def handle(request: proto.ConfigRequest): Unit
}class SparkConnectAddArtifactsHandler(responseObserver: StreamObserver[proto.AddArtifactsResponse]) {
def handle(request: proto.AddArtifactsRequest): Unit
}class SparkConnectArtifactStatusesHandler(responseObserver: StreamObserver[proto.ArtifactStatusesResponse]) {
def handle(request: proto.ArtifactStatusesRequest): Unit
}class SparkConnectInterruptHandler(responseObserver: StreamObserver[proto.InterruptResponse]) {
def handle(request: proto.InterruptRequest): Unit
}class SparkConnectReattachExecuteHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {
def handle(request: proto.ReattachExecuteRequest): Unit
}class SparkConnectReleaseExecuteHandler(responseObserver: StreamObserver[proto.ReleaseExecuteResponse]) {
def handle(request: proto.ReleaseExecuteRequest): Unit
}The server supports gRPC interceptors for cross-cutting concerns.
object SparkConnectInterceptorRegistry {
def chainInterceptors(sb: NettyServerBuilder): Unit
def createConfiguredInterceptors(): Seq[ServerInterceptor]
}class LoggingInterceptor extends ServerInterceptor
class LocalPropertiesCleanupInterceptor extends ServerInterceptorimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.service.SparkConnectService
// Create Spark session
val spark = SparkSession.builder()
.appName("MyApp")
.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
.getOrCreate()
// Start Connect server
SparkConnectService.start(spark.sparkContext)
// Server is now listening for client connections
println("Connect server started")
// Later, shutdown the server
SparkConnectService.stop(Some(30), Some(TimeUnit.SECONDS))import org.apache.spark.sql.connect.service.SparkConnectService
// Check active executions
SparkConnectService.listActiveExecutions match {
case Left(count) => println(s"Active executions: $count")
case Right(executions) =>
executions.foreach { exec =>
println(s"Execution ${exec.executeId}: ${exec.status}")
}
}
// Get server instance
val server = SparkConnectService.server
println(s"Server port: ${server.getPort}")
println(s"Server services: ${server.getServices.size()}")Server behavior is controlled through Spark configuration properties. See the Configuration documentation for details on available settings including:
All request handlers use centralized error handling through the ErrorUtils utility, which:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-connect-2-13