or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md
tile.json

plugin-system.mddocs/

Plugin System

The Spark Connect Server provides an extensible plugin architecture that allows custom extensions for relation processing, expression evaluation, and command handling. This system enables developers to extend the server's capabilities without modifying the core implementation.

Plugin Interfaces

RelationPlugin

Extends relation processing in the planner to support custom data sources and operations.

trait RelationPlugin {
  def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[LogicalPlan]
}

Parameters:

  • relation: Protocol buffer message containing the custom relation definition
  • planner: The SparkConnectPlanner instance for accessing conversion utilities

Returns:

  • Some(LogicalPlan) if the plugin handles this relation type
  • None if the plugin doesn't recognize the relation

ExpressionPlugin

Extends expression processing to support custom functions and operators.

trait ExpressionPlugin {
  def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Expression]
}

Parameters:

  • relation: Protocol buffer message containing the custom expression definition
  • planner: The SparkConnectPlanner instance for accessing conversion utilities

Returns:

  • Some(Expression) if the plugin handles this expression type
  • None if the plugin doesn't recognize the expression

CommandPlugin

Extends command processing to support custom operations and administrative commands.

trait CommandPlugin {
  def process(command: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Unit]
}

Parameters:

  • command: Protocol buffer message containing the custom command definition
  • planner: The SparkConnectPlanner instance for accessing conversion utilities

Returns:

  • Some(Unit) if the plugin handles this command type
  • None if the plugin doesn't recognize the command

Plugin Registry

SparkConnectPluginRegistry

Central registry for managing all plugin types.

object SparkConnectPluginRegistry {
  def relationRegistry: Seq[RelationPlugin]
  def expressionRegistry: Seq[ExpressionPlugin]
  def commandRegistry: Seq[CommandPlugin]
  def createConfiguredPlugins[T](values: Seq[String]): Seq[T]
}

Properties:

  • relationRegistry: All registered relation plugins
  • expressionRegistry: All registered expression plugins
  • commandRegistry: All registered command plugins

Methods:

  • createConfiguredPlugins[T]: Creates plugin instances from configuration class names

Main Plugin Integration

SparkConnectPlugin

Integrates the Connect server with Spark's plugin system.

class SparkConnectPlugin extends SparkPlugin {
  def driverPlugin(): DriverPlugin
  def executorPlugin(): ExecutorPlugin
}

Plugin Configuration

Plugins are configured through Spark configuration properties:

// From Connect configuration object
val CONNECT_EXTENSIONS_RELATION_CLASSES: ConfigEntry[Seq[String]]
val CONNECT_EXTENSIONS_EXPRESSION_CLASSES: ConfigEntry[Seq[String]]
val CONNECT_EXTENSIONS_COMMAND_CLASSES: ConfigEntry[Seq[String]]

Usage Examples

Creating a Custom Relation Plugin

import org.apache.spark.sql.connect.plugin.RelationPlugin
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import com.google.protobuf.Any

class MyCustomRelationPlugin extends RelationPlugin {
  override def transform(relation: Any, planner: SparkConnectPlanner): Option[LogicalPlan] = {
    // Check if this is our custom relation type
    if (relation.is(MyCustomRelation.getDefaultInstance.getClass)) {
      val customRel = relation.unpack(classOf[MyCustomRelation])
      
      // Convert to Catalyst LogicalPlan
      val logicalPlan = createLogicalPlan(customRel, planner)
      Some(logicalPlan)
    } else {
      None
    }
  }
  
  private def createLogicalPlan(rel: MyCustomRelation, planner: SparkConnectPlanner): LogicalPlan = {
    // Implementation specific to your custom relation
    ???
  }
}

Creating a Custom Expression Plugin

import org.apache.spark.sql.connect.plugin.ExpressionPlugin
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.catalyst.expressions.Expression
import com.google.protobuf.Any

class MyCustomExpressionPlugin extends ExpressionPlugin {
  override def transform(relation: Any, planner: SparkConnectPlanner): Option[Expression] = {
    if (relation.is(MyCustomExpression.getDefaultInstance.getClass)) {
      val customExpr = relation.unpack(classOf[MyCustomExpression])
      
      // Convert to Catalyst Expression
      val catalystExpr = createExpression(customExpr, planner)
      Some(catalystExpr)
    } else {
      None
    }
  }
  
  private def createExpression(expr: MyCustomExpression, planner: SparkConnectPlanner): Expression = {
    // Implementation specific to your custom expression
    ???
  }
}

Creating a Custom Command Plugin

import org.apache.spark.sql.connect.plugin.CommandPlugin
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import com.google.protobuf.Any

class MyCustomCommandPlugin extends CommandPlugin {
  override def process(command: Any, planner: SparkConnectPlanner): Option[Unit] = {
    if (command.is(MyCustomCommand.getDefaultInstance.getClass)) {
      val customCmd = command.unpack(classOf[MyCustomCommand])
      
      // Execute the custom command
      executeCommand(customCmd, planner)
      Some(())
    } else {
      None
    }
  }
  
  private def executeCommand(cmd: MyCustomCommand, planner: SparkConnectPlanner): Unit = {
    // Implementation specific to your custom command
    ???
  }
}

Configuring Plugins

Configure your plugins through Spark configuration:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("MyApp")
  .config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
  .config("spark.connect.extensions.relation.classes", "com.mycompany.MyCustomRelationPlugin")
  .config("spark.connect.extensions.expression.classes", "com.mycompany.MyCustomExpressionPlugin")
  .config("spark.connect.extensions.command.classes", "com.mycompany.MyCustomCommandPlugin")
  .getOrCreate()

Or via configuration files:

spark.connect.extensions.relation.classes=com.mycompany.MyCustomRelationPlugin,com.mycompany.AnotherRelationPlugin
spark.connect.extensions.expression.classes=com.mycompany.MyCustomExpressionPlugin
spark.connect.extensions.command.classes=com.mycompany.MyCustomCommandPlugin

Plugin Lifecycle

  1. Registration: Plugins are registered during server startup based on configuration
  2. Discovery: The registry loads plugin classes using reflection
  3. Instantiation: Plugin instances are created and cached
  4. Invocation: Plugins are called during plan processing in registration order
  5. Chain Processing: If a plugin returns None, the next plugin in the chain is tried

Best Practices

Plugin Implementation

  • Always check the protocol buffer type before processing
  • Return None for unrecognized message types
  • Use the planner parameter for accessing conversion utilities
  • Handle exceptions gracefully and log appropriate error messages
  • Maintain thread safety as plugins may be called concurrently

Error Handling

  • Validate input protocol buffer messages thoroughly
  • Provide meaningful error messages for debugging
  • Use appropriate Spark exception types
  • Consider performance implications of plugin processing

Testing

  • Unit test plugin logic independently
  • Integration test with the full Connect server
  • Test with various protocol buffer message types
  • Verify plugin chain behavior when multiple plugins are registered

Security Considerations

  • Validate all input from protocol buffer messages
  • Avoid executing arbitrary code based on user input
  • Use appropriate access controls for sensitive operations
  • Consider sandboxing for user-provided plugin code