The plan processing system converts protocol buffer messages from Connect clients into Catalyst logical plans that can be executed by Spark. This includes relation transformation, expression conversion, and command processing.
Main planner class that handles conversion from protocol buffer plans to Catalyst plans.
class SparkConnectPlanner(sessionHolder: SessionHolder) {
def transformRelation(rel: proto.Relation): LogicalPlan
def transformExpression(exp: proto.Expression): Expression
def process(command: proto.Command, responseObserver: StreamObserver[proto.ExecutePlanResponse], executeHolder: ExecuteHolder): Unit
}Parameters:
sessionHolder: The session context for plan processingKey Methods:
transformRelation: Converts protocol buffer relations to Catalyst LogicalPlantransformExpression: Converts protocol buffer expressions to Catalyst Expressionprocess: Processes protocol buffer commands with streaming responseManages the execution lifecycle of Spark Connect plans.
class SparkConnectPlanExecution(
executeHolder: ExecuteHolder,
sessionHolder: SessionHolder,
responseObserver: StreamObserver[proto.ExecutePlanResponse],
request: proto.ExecutePlanRequest
) {
// Execution lifecycle methods (internal implementation)
}Converts protocol buffer literals to Catalyst expressions.
object LiteralExpressionProtoConverter {
def toCatalystValue(literal: proto.Expression.Literal): Any
def toConnectProtoType(dt: DataType): proto.DataType
def toCatalystType(dt: proto.DataType): DataType
}Key Methods:
toCatalystValue: Convert protobuf literal to Catalyst valuetoConnectProtoType: Convert Catalyst DataType to protobuf DataTypetoCatalystType: Convert protobuf DataType to Catalyst DataTypeConverts protocol buffer save modes to Spark save modes.
object SaveModeConverter {
def toSaveMode(mode: proto.WriteOperation.SaveMode): SaveMode
def toProto(mode: SaveMode): proto.WriteOperation.SaveMode
}Converts table save method configurations.
object TableSaveMethodConverter {
def toSaveMethod(method: proto.WriteOperation.SaveTable.TableSaveMethod): String
}Holds execution state and manages the execution lifecycle.
class ExecuteHolder(
executeId: String,
request: proto.ExecutePlanRequest,
sessionHolder: SessionHolder
) {
def executeId: String
def sessionHolder: SessionHolder
def request: proto.ExecutePlanRequest
// Additional state management methods (internal)
}Manages execution in separate threads for concurrent processing.
class ExecuteThreadRunner(
executeHolder: ExecuteHolder,
responseObserver: ExecuteResponseObserver[proto.ExecutePlanResponse]
) extends Runnable {
def run(): Unit
}Observes and streams execution responses to clients.
class ExecuteResponseObserver[T](
userId: String,
sessionId: String,
responseObserver: StreamObserver[T]
) extends StreamObserver[T] {
def onNext(value: T): Unit
def onError(t: Throwable): Unit
def onCompleted(): Unit
}Sends execution responses via gRPC streaming.
class ExecuteGrpcResponseSender(
executeHolder: ExecuteHolder,
responseObserver: StreamObserver[proto.ExecutePlanResponse]
) {
def sendResponse(response: proto.ExecutePlanResponse): Unit
def sendError(error: Throwable): Unit
def sendCompleted(): Unit
}Caches streaming responses for reattachable executions.
class CachedStreamResponse(
executeId: String,
maxCacheSize: Int = 1000
) {
def addResponse(response: proto.ExecutePlanResponse): Unit
def getResponses(fromIndex: Int): Seq[proto.ExecutePlanResponse]
def size: Int
}Helper for streaming foreachBatch operations.
object StreamingForeachBatchHelper {
def foreachBatch(
pythonExec: proto.PythonUDF,
sessionHolder: SessionHolder
): (Dataset[Row], Long) => Unit
}Helper for streaming query listener management.
object StreamingQueryListenerHelper {
def addListener(
query: StreamingQuery,
listener: proto.StreamingQueryListener,
sessionHolder: SessionHolder
): StreamingQueryListener
}import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.connect.proto
// Create planner instance
val planner = new SparkConnectPlanner(sessionHolder)
// Transform a relation
val protoRelation: proto.Relation = // ... from client request
val logicalPlan = planner.transformRelation(protoRelation)
// Transform an expression
val protoExpression: proto.Expression = // ... from client request
val catalystExpr = planner.transformExpression(protoExpression)import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.connect.proto
import io.grpc.stub.StreamObserver
// Set up response observer
val responseObserver: StreamObserver[proto.ExecutePlanResponse] = // ... from gRPC
// Create execute holder for state management
val executeHolder = new ExecuteHolder(executeId, request, sessionHolder)
// Process command with streaming response
val planner = new SparkConnectPlanner(sessionHolder)
planner.process(request.getPlan.getCommand, responseObserver, executeHolder)import org.apache.spark.sql.connect.planner.LiteralExpressionProtoConverter
import org.apache.spark.connect.proto
// Convert protobuf literal to Catalyst value
val protoLiteral: proto.Expression.Literal = // ... from client
val catalystValue = LiteralExpressionProtoConverter.toCatalystValue(protoLiteral)
// Convert Catalyst DataType to protobuf
val catalystType: DataType = StringType
val protoType = LiteralExpressionProtoConverter.toConnectProtoType(catalystType)import org.apache.spark.sql.connect.planner.SaveModeConverter
import org.apache.spark.connect.proto
// Convert protobuf save mode to Spark SaveMode
val protoSaveMode = proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
val sparkSaveMode = SaveModeConverter.toSaveMode(protoSaveMode)
// Convert Spark SaveMode back to protobuf
val backToProto = SaveModeConverter.toProto(sparkSaveMode)Plan processing uses structured error handling to provide meaningful error messages to clients:
All errors are converted to appropriate gRPC status codes and include context about the failing operation.
The plan processing system provides several extension points: