0
# Plugin System
1
2
The Spark Connect Server provides an extensible plugin architecture that allows custom extensions for relation processing, expression evaluation, and command handling. This system enables developers to extend the server's capabilities without modifying the core implementation.
3
4
## Plugin Interfaces
5
6
### RelationPlugin
7
8
Extends relation processing in the planner to support custom data sources and operations.
9
10
```scala { .api }
11
trait RelationPlugin {
12
def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[LogicalPlan]
13
}
14
```
15
16
**Parameters:**
17
- `relation`: Protocol buffer message containing the custom relation definition
18
- `planner`: The SparkConnectPlanner instance for accessing conversion utilities
19
20
**Returns:**
21
- `Some(LogicalPlan)` if the plugin handles this relation type
22
- `None` if the plugin doesn't recognize the relation
23
24
### ExpressionPlugin
25
26
Extends expression processing to support custom functions and operators.
27
28
```scala { .api }
29
trait ExpressionPlugin {
30
def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Expression]
31
}
32
```
33
34
**Parameters:**
35
- `relation`: Protocol buffer message containing the custom expression definition
36
- `planner`: The SparkConnectPlanner instance for accessing conversion utilities
37
38
**Returns:**
39
- `Some(Expression)` if the plugin handles this expression type
40
- `None` if the plugin doesn't recognize the expression
41
42
### CommandPlugin
43
44
Extends command processing to support custom operations and administrative commands.
45
46
```scala { .api }
47
trait CommandPlugin {
48
def process(command: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Unit]
49
}
50
```
51
52
**Parameters:**
53
- `command`: Protocol buffer message containing the custom command definition
54
- `planner`: The SparkConnectPlanner instance for accessing conversion utilities
55
56
**Returns:**
57
- `Some(Unit)` if the plugin handles this command type
58
- `None` if the plugin doesn't recognize the command
59
60
## Plugin Registry
61
62
### SparkConnectPluginRegistry
63
64
Central registry for managing all plugin types.
65
66
```scala { .api }
67
object SparkConnectPluginRegistry {
68
def relationRegistry: Seq[RelationPlugin]
69
def expressionRegistry: Seq[ExpressionPlugin]
70
def commandRegistry: Seq[CommandPlugin]
71
def createConfiguredPlugins[T](values: Seq[String]): Seq[T]
72
}
73
```
74
75
**Properties:**
76
- `relationRegistry`: All registered relation plugins
77
- `expressionRegistry`: All registered expression plugins
78
- `commandRegistry`: All registered command plugins
79
80
**Methods:**
81
- `createConfiguredPlugins[T]`: Creates plugin instances from configuration class names
82
83
## Main Plugin Integration
84
85
### SparkConnectPlugin
86
87
Integrates the Connect server with Spark's plugin system.
88
89
```scala { .api }
90
class SparkConnectPlugin extends SparkPlugin {
91
def driverPlugin(): DriverPlugin
92
def executorPlugin(): ExecutorPlugin
93
}
94
```
95
96
## Plugin Configuration
97
98
Plugins are configured through Spark configuration properties:
99
100
```scala { .api }
101
// From Connect configuration object
102
val CONNECT_EXTENSIONS_RELATION_CLASSES: ConfigEntry[Seq[String]]
103
val CONNECT_EXTENSIONS_EXPRESSION_CLASSES: ConfigEntry[Seq[String]]
104
val CONNECT_EXTENSIONS_COMMAND_CLASSES: ConfigEntry[Seq[String]]
105
```
106
107
## Usage Examples
108
109
### Creating a Custom Relation Plugin
110
111
```scala
112
import org.apache.spark.sql.connect.plugin.RelationPlugin
113
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
114
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
115
import com.google.protobuf.Any
116
117
class MyCustomRelationPlugin extends RelationPlugin {
118
override def transform(relation: Any, planner: SparkConnectPlanner): Option[LogicalPlan] = {
119
// Check if this is our custom relation type
120
if (relation.is(MyCustomRelation.getDefaultInstance.getClass)) {
121
val customRel = relation.unpack(classOf[MyCustomRelation])
122
123
// Convert to Catalyst LogicalPlan
124
val logicalPlan = createLogicalPlan(customRel, planner)
125
Some(logicalPlan)
126
} else {
127
None
128
}
129
}
130
131
private def createLogicalPlan(rel: MyCustomRelation, planner: SparkConnectPlanner): LogicalPlan = {
132
// Implementation specific to your custom relation
133
???
134
}
135
}
136
```
137
138
### Creating a Custom Expression Plugin
139
140
```scala
141
import org.apache.spark.sql.connect.plugin.ExpressionPlugin
142
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
143
import org.apache.spark.sql.catalyst.expressions.Expression
144
import com.google.protobuf.Any
145
146
class MyCustomExpressionPlugin extends ExpressionPlugin {
147
override def transform(relation: Any, planner: SparkConnectPlanner): Option[Expression] = {
148
if (relation.is(MyCustomExpression.getDefaultInstance.getClass)) {
149
val customExpr = relation.unpack(classOf[MyCustomExpression])
150
151
// Convert to Catalyst Expression
152
val catalystExpr = createExpression(customExpr, planner)
153
Some(catalystExpr)
154
} else {
155
None
156
}
157
}
158
159
private def createExpression(expr: MyCustomExpression, planner: SparkConnectPlanner): Expression = {
160
// Implementation specific to your custom expression
161
???
162
}
163
}
164
```
165
166
### Creating a Custom Command Plugin
167
168
```scala
169
import org.apache.spark.sql.connect.plugin.CommandPlugin
170
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
171
import com.google.protobuf.Any
172
173
class MyCustomCommandPlugin extends CommandPlugin {
174
override def process(command: Any, planner: SparkConnectPlanner): Option[Unit] = {
175
if (command.is(MyCustomCommand.getDefaultInstance.getClass)) {
176
val customCmd = command.unpack(classOf[MyCustomCommand])
177
178
// Execute the custom command
179
executeCommand(customCmd, planner)
180
Some(())
181
} else {
182
None
183
}
184
}
185
186
private def executeCommand(cmd: MyCustomCommand, planner: SparkConnectPlanner): Unit = {
187
// Implementation specific to your custom command
188
???
189
}
190
}
191
```
192
193
### Configuring Plugins
194
195
Configure your plugins through Spark configuration:
196
197
```scala
198
import org.apache.spark.sql.SparkSession
199
200
val spark = SparkSession.builder()
201
.appName("MyApp")
202
.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
203
.config("spark.connect.extensions.relation.classes", "com.mycompany.MyCustomRelationPlugin")
204
.config("spark.connect.extensions.expression.classes", "com.mycompany.MyCustomExpressionPlugin")
205
.config("spark.connect.extensions.command.classes", "com.mycompany.MyCustomCommandPlugin")
206
.getOrCreate()
207
```
208
209
Or via configuration files:
210
211
```properties
212
spark.connect.extensions.relation.classes=com.mycompany.MyCustomRelationPlugin,com.mycompany.AnotherRelationPlugin
213
spark.connect.extensions.expression.classes=com.mycompany.MyCustomExpressionPlugin
214
spark.connect.extensions.command.classes=com.mycompany.MyCustomCommandPlugin
215
```
216
217
## Plugin Lifecycle
218
219
1. **Registration**: Plugins are registered during server startup based on configuration
220
2. **Discovery**: The registry loads plugin classes using reflection
221
3. **Instantiation**: Plugin instances are created and cached
222
4. **Invocation**: Plugins are called during plan processing in registration order
223
5. **Chain Processing**: If a plugin returns `None`, the next plugin in the chain is tried
224
225
## Best Practices
226
227
### Plugin Implementation
228
229
- Always check the protocol buffer type before processing
230
- Return `None` for unrecognized message types
231
- Use the planner parameter for accessing conversion utilities
232
- Handle exceptions gracefully and log appropriate error messages
233
- Maintain thread safety as plugins may be called concurrently
234
235
### Error Handling
236
237
- Validate input protocol buffer messages thoroughly
238
- Provide meaningful error messages for debugging
239
- Use appropriate Spark exception types
240
- Consider performance implications of plugin processing
241
242
### Testing
243
244
- Unit test plugin logic independently
245
- Integration test with the full Connect server
246
- Test with various protocol buffer message types
247
- Verify plugin chain behavior when multiple plugins are registered
248
249
## Security Considerations
250
251
- Validate all input from protocol buffer messages
252
- Avoid executing arbitrary code based on user input
253
- Use appropriate access controls for sensitive operations
254
- Consider sandboxing for user-provided plugin code