or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md
tile.json

plan-processing.mddocs/

Plan Processing

The plan processing system converts protocol buffer messages from Connect clients into Catalyst logical plans that can be executed by Spark. This includes relation transformation, expression conversion, and command processing.

Core Planner

SparkConnectPlanner

Main planner class that handles conversion from protocol buffer plans to Catalyst plans.

class SparkConnectPlanner(sessionHolder: SessionHolder) {
  def transformRelation(rel: proto.Relation): LogicalPlan
  def transformExpression(exp: proto.Expression): Expression
  def process(command: proto.Command, responseObserver: StreamObserver[proto.ExecutePlanResponse], executeHolder: ExecuteHolder): Unit
}

Parameters:

  • sessionHolder: The session context for plan processing

Key Methods:

  • transformRelation: Converts protocol buffer relations to Catalyst LogicalPlan
  • transformExpression: Converts protocol buffer expressions to Catalyst Expression
  • process: Processes protocol buffer commands with streaming response

Plan Execution

SparkConnectPlanExecution

Manages the execution lifecycle of Spark Connect plans.

class SparkConnectPlanExecution(
  executeHolder: ExecuteHolder,
  sessionHolder: SessionHolder,
  responseObserver: StreamObserver[proto.ExecutePlanResponse],
  request: proto.ExecutePlanRequest
) {
  // Execution lifecycle methods (internal implementation)
}

Expression and Type Converters

LiteralExpressionProtoConverter

Converts protocol buffer literals to Catalyst expressions.

object LiteralExpressionProtoConverter {
  def toCatalystValue(literal: proto.Expression.Literal): Any
  def toConnectProtoType(dt: DataType): proto.DataType
  def toCatalystType(dt: proto.DataType): DataType
}

Key Methods:

  • toCatalystValue: Convert protobuf literal to Catalyst value
  • toConnectProtoType: Convert Catalyst DataType to protobuf DataType
  • toCatalystType: Convert protobuf DataType to Catalyst DataType

SaveModeConverter

Converts protocol buffer save modes to Spark save modes.

object SaveModeConverter {
  def toSaveMode(mode: proto.WriteOperation.SaveMode): SaveMode
  def toProto(mode: SaveMode): proto.WriteOperation.SaveMode
}

TableSaveMethodConverter

Converts table save method configurations.

object TableSaveMethodConverter {
  def toSaveMethod(method: proto.WriteOperation.SaveTable.TableSaveMethod): String
}

Execution Infrastructure

ExecuteHolder

Holds execution state and manages the execution lifecycle.

class ExecuteHolder(
  executeId: String,
  request: proto.ExecutePlanRequest,
  sessionHolder: SessionHolder
) {
  def executeId: String
  def sessionHolder: SessionHolder
  def request: proto.ExecutePlanRequest
  // Additional state management methods (internal)
}

ExecuteThreadRunner

Manages execution in separate threads for concurrent processing.

class ExecuteThreadRunner(
  executeHolder: ExecuteHolder,
  responseObserver: ExecuteResponseObserver[proto.ExecutePlanResponse]
) extends Runnable {
  def run(): Unit
}

ExecuteResponseObserver

Observes and streams execution responses to clients.

class ExecuteResponseObserver[T](
  userId: String,
  sessionId: String,
  responseObserver: StreamObserver[T]
) extends StreamObserver[T] {
  def onNext(value: T): Unit
  def onError(t: Throwable): Unit
  def onCompleted(): Unit
}

ExecuteGrpcResponseSender

Sends execution responses via gRPC streaming.

class ExecuteGrpcResponseSender(
  executeHolder: ExecuteHolder,
  responseObserver: StreamObserver[proto.ExecutePlanResponse]
) {
  def sendResponse(response: proto.ExecutePlanResponse): Unit
  def sendError(error: Throwable): Unit
  def sendCompleted(): Unit
}

CachedStreamResponse

Caches streaming responses for reattachable executions.

class CachedStreamResponse(
  executeId: String,
  maxCacheSize: Int = 1000
) {
  def addResponse(response: proto.ExecutePlanResponse): Unit
  def getResponses(fromIndex: Int): Seq[proto.ExecutePlanResponse]
  def size: Int
}

Streaming Support

StreamingForeachBatchHelper

Helper for streaming foreachBatch operations.

object StreamingForeachBatchHelper {
  def foreachBatch(
    pythonExec: proto.PythonUDF,
    sessionHolder: SessionHolder
  ): (Dataset[Row], Long) => Unit
}

StreamingQueryListenerHelper

Helper for streaming query listener management.

object StreamingQueryListenerHelper {
  def addListener(
    query: StreamingQuery,
    listener: proto.StreamingQueryListener,
    sessionHolder: SessionHolder
  ): StreamingQueryListener
}

Usage Examples

Basic Plan Processing

import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.connect.proto

// Create planner instance
val planner = new SparkConnectPlanner(sessionHolder)

// Transform a relation
val protoRelation: proto.Relation = // ... from client request
val logicalPlan = planner.transformRelation(protoRelation)

// Transform an expression  
val protoExpression: proto.Expression = // ... from client request
val catalystExpr = planner.transformExpression(protoExpression)

Processing with Response Streaming

import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.connect.proto
import io.grpc.stub.StreamObserver

// Set up response observer
val responseObserver: StreamObserver[proto.ExecutePlanResponse] = // ... from gRPC

// Create execute holder for state management
val executeHolder = new ExecuteHolder(executeId, request, sessionHolder)

// Process command with streaming response
val planner = new SparkConnectPlanner(sessionHolder)
planner.process(request.getPlan.getCommand, responseObserver, executeHolder)

Custom Expression Conversion

import org.apache.spark.sql.connect.planner.LiteralExpressionProtoConverter
import org.apache.spark.connect.proto

// Convert protobuf literal to Catalyst value
val protoLiteral: proto.Expression.Literal = // ... from client
val catalystValue = LiteralExpressionProtoConverter.toCatalystValue(protoLiteral)

// Convert Catalyst DataType to protobuf
val catalystType: DataType = StringType
val protoType = LiteralExpressionProtoConverter.toConnectProtoType(catalystType)

Save Operation Configuration

import org.apache.spark.sql.connect.planner.SaveModeConverter
import org.apache.spark.connect.proto

// Convert protobuf save mode to Spark SaveMode
val protoSaveMode = proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
val sparkSaveMode = SaveModeConverter.toSaveMode(protoSaveMode)

// Convert Spark SaveMode back to protobuf
val backToProto = SaveModeConverter.toProto(sparkSaveMode)

Error Handling

Plan processing uses structured error handling to provide meaningful error messages to clients:

  • Validation Errors: Invalid protocol buffer structure or unsupported operations
  • Conversion Errors: Issues converting between protobuf and Catalyst representations
  • Execution Errors: Runtime errors during plan execution
  • Resource Errors: Memory limits, timeouts, or resource exhaustion

All errors are converted to appropriate gRPC status codes and include context about the failing operation.

Performance Considerations

Plan Caching

  • Logical plans may be cached to avoid repeated conversion overhead
  • Expression conversion results are cached for common operations
  • Plugin results are cached when applicable

Streaming Optimization

  • Large result sets use streaming responses to avoid memory issues
  • Reattachable executions cache intermediate results for fault tolerance
  • Response batching reduces network overhead

Concurrent Processing

  • Multiple client requests are processed concurrently
  • Thread-safe execution state management
  • Resource isolation between sessions

Extension Points

The plan processing system provides several extension points:

  1. Plugin System: Custom relations, expressions, and commands via plugin interfaces
  2. Custom Converters: Extend type conversion for domain-specific data types
  3. Execution Hooks: Custom processing during plan execution lifecycle
  4. Response Processing: Custom response formatting and streaming logic