The Spark Connect Server provides an extensible plugin architecture that allows custom extensions for relation processing, expression evaluation, and command handling. This system enables developers to extend the server's capabilities without modifying the core implementation.
Extends relation processing in the planner to support custom data sources and operations.
trait RelationPlugin {
def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[LogicalPlan]
}Parameters:
relation: Protocol buffer message containing the custom relation definitionplanner: The SparkConnectPlanner instance for accessing conversion utilitiesReturns:
Some(LogicalPlan) if the plugin handles this relation typeNone if the plugin doesn't recognize the relationExtends expression processing to support custom functions and operators.
trait ExpressionPlugin {
def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Expression]
}Parameters:
relation: Protocol buffer message containing the custom expression definitionplanner: The SparkConnectPlanner instance for accessing conversion utilitiesReturns:
Some(Expression) if the plugin handles this expression typeNone if the plugin doesn't recognize the expressionExtends command processing to support custom operations and administrative commands.
trait CommandPlugin {
def process(command: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Unit]
}Parameters:
command: Protocol buffer message containing the custom command definitionplanner: The SparkConnectPlanner instance for accessing conversion utilitiesReturns:
Some(Unit) if the plugin handles this command typeNone if the plugin doesn't recognize the commandCentral registry for managing all plugin types.
object SparkConnectPluginRegistry {
def relationRegistry: Seq[RelationPlugin]
def expressionRegistry: Seq[ExpressionPlugin]
def commandRegistry: Seq[CommandPlugin]
def createConfiguredPlugins[T](values: Seq[String]): Seq[T]
}Properties:
relationRegistry: All registered relation pluginsexpressionRegistry: All registered expression pluginscommandRegistry: All registered command pluginsMethods:
createConfiguredPlugins[T]: Creates plugin instances from configuration class namesIntegrates the Connect server with Spark's plugin system.
class SparkConnectPlugin extends SparkPlugin {
def driverPlugin(): DriverPlugin
def executorPlugin(): ExecutorPlugin
}Plugins are configured through Spark configuration properties:
// From Connect configuration object
val CONNECT_EXTENSIONS_RELATION_CLASSES: ConfigEntry[Seq[String]]
val CONNECT_EXTENSIONS_EXPRESSION_CLASSES: ConfigEntry[Seq[String]]
val CONNECT_EXTENSIONS_COMMAND_CLASSES: ConfigEntry[Seq[String]]import org.apache.spark.sql.connect.plugin.RelationPlugin
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import com.google.protobuf.Any
class MyCustomRelationPlugin extends RelationPlugin {
override def transform(relation: Any, planner: SparkConnectPlanner): Option[LogicalPlan] = {
// Check if this is our custom relation type
if (relation.is(MyCustomRelation.getDefaultInstance.getClass)) {
val customRel = relation.unpack(classOf[MyCustomRelation])
// Convert to Catalyst LogicalPlan
val logicalPlan = createLogicalPlan(customRel, planner)
Some(logicalPlan)
} else {
None
}
}
private def createLogicalPlan(rel: MyCustomRelation, planner: SparkConnectPlanner): LogicalPlan = {
// Implementation specific to your custom relation
???
}
}import org.apache.spark.sql.connect.plugin.ExpressionPlugin
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.catalyst.expressions.Expression
import com.google.protobuf.Any
class MyCustomExpressionPlugin extends ExpressionPlugin {
override def transform(relation: Any, planner: SparkConnectPlanner): Option[Expression] = {
if (relation.is(MyCustomExpression.getDefaultInstance.getClass)) {
val customExpr = relation.unpack(classOf[MyCustomExpression])
// Convert to Catalyst Expression
val catalystExpr = createExpression(customExpr, planner)
Some(catalystExpr)
} else {
None
}
}
private def createExpression(expr: MyCustomExpression, planner: SparkConnectPlanner): Expression = {
// Implementation specific to your custom expression
???
}
}import org.apache.spark.sql.connect.plugin.CommandPlugin
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import com.google.protobuf.Any
class MyCustomCommandPlugin extends CommandPlugin {
override def process(command: Any, planner: SparkConnectPlanner): Option[Unit] = {
if (command.is(MyCustomCommand.getDefaultInstance.getClass)) {
val customCmd = command.unpack(classOf[MyCustomCommand])
// Execute the custom command
executeCommand(customCmd, planner)
Some(())
} else {
None
}
}
private def executeCommand(cmd: MyCustomCommand, planner: SparkConnectPlanner): Unit = {
// Implementation specific to your custom command
???
}
}Configure your plugins through Spark configuration:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("MyApp")
.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
.config("spark.connect.extensions.relation.classes", "com.mycompany.MyCustomRelationPlugin")
.config("spark.connect.extensions.expression.classes", "com.mycompany.MyCustomExpressionPlugin")
.config("spark.connect.extensions.command.classes", "com.mycompany.MyCustomCommandPlugin")
.getOrCreate()Or via configuration files:
spark.connect.extensions.relation.classes=com.mycompany.MyCustomRelationPlugin,com.mycompany.AnotherRelationPlugin
spark.connect.extensions.expression.classes=com.mycompany.MyCustomExpressionPlugin
spark.connect.extensions.command.classes=com.mycompany.MyCustomCommandPluginNone, the next plugin in the chain is triedNone for unrecognized message types