A decoupled client-server architecture component for Apache Spark that enables remote connectivity to Spark clusters using the DataFrame API and gRPC protocol.
npx @tessl/cli install tessl/maven-org-apache-spark--spark-connect_2-13@3.5.0Apache Spark Connect Server provides a decoupled client-server architecture that enables remote connectivity to Spark clusters using the DataFrame API and unresolved logical plans as the protocol. The server acts as a gRPC service that receives requests from Spark Connect clients and executes them on the Spark cluster, enabling Spark to be leveraged from various environments including modern data applications, IDEs, notebooks, and different programming languages.
Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-connect_2.13</artifactId>
<version>3.5.6</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" %% "spark-connect" % "3.5.6"import org.apache.spark.sql.connect.service.{SparkConnectService, SparkConnectServer}
import org.apache.spark.sql.connect.plugin.{RelationPlugin, ExpressionPlugin, CommandPlugin}
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connect.config.Connectimport org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.service.SparkConnectService
// Start Spark session
val session = SparkSession.builder.getOrCreate()
// Start Connect server
SparkConnectService.start(session.sparkContext)
// Server is now listening on configured port (default: 15002)import org.apache.spark.sql.connect.service.SparkConnectServer
// Run standalone server
SparkConnectServer.main(Array.empty)The Spark Connect Server architecture consists of several key layers:
Core server functionality including startup, configuration, and lifecycle management.
object SparkConnectService {
def start(sc: SparkContext): Unit
def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit
}
object SparkConnectServer {
def main(args: Array[String]): Unit
}Extensible plugin architecture for custom relations, expressions, and commands.
trait RelationPlugin {
def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[LogicalPlan]
}
trait ExpressionPlugin {
def transform(expression: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Expression]
}
trait CommandPlugin {
def process(command: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Unit]
}Convert protocol buffer plans to Catalyst plans and manage execution lifecycle.
class SparkConnectPlanner(sessionHolder: SessionHolder) {
def transformRelation(rel: proto.Relation): LogicalPlan
def transformExpression(exp: proto.Expression): Expression
def process(command: proto.Command, responseObserver: StreamObserver[ExecutePlanResponse], executeHolder: ExecuteHolder): Unit
}Manage client sessions, execution state, and concurrent operations.
object SparkConnectService {
def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder
def getIsolatedSession(userId: String, sessionId: String): SessionHolder
def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]
}Handle JAR uploads, file management, and dynamic class loading for user code.
class SparkConnectArtifactManager(sessionHolder: SessionHolder) {
def getSparkConnectAddedJars: Seq[URL]
def getSparkConnectPythonIncludes: Seq[String]
def classloader: ClassLoader
}Web interface components for server monitoring, session tracking, and debugging.
class SparkConnectServerTab(sparkContext: SparkContext, store: SparkConnectServerAppStatusStore, appName: String) {
def detach(): Unit
def displayOrder: Int
}Comprehensive configuration options for server behavior, security, and performance tuning.
object Connect {
val CONNECT_GRPC_BINDING_PORT: ConfigEntry[Int]
val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE: ConfigEntry[Int]
val CONNECT_EXECUTE_REATTACHABLE_ENABLED: ConfigEntry[Boolean]
// ... additional configuration entries
}The server uses centralized error handling through the ErrorUtils object, which converts Spark exceptions to appropriate gRPC status codes and error messages for client consumption.