or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md

plugin-system.mddocs/

0

# Plugin System

1

2

The Spark Connect Server provides an extensible plugin architecture that allows custom extensions for relation processing, expression evaluation, and command handling. This system enables developers to extend the server's capabilities without modifying the core implementation.

3

4

## Plugin Interfaces

5

6

### RelationPlugin

7

8

Extends relation processing in the planner to support custom data sources and operations.

9

10

```scala { .api }

11

trait RelationPlugin {

12

def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[LogicalPlan]

13

}

14

```

15

16

**Parameters:**

17

- `relation`: Protocol buffer message containing the custom relation definition

18

- `planner`: The SparkConnectPlanner instance for accessing conversion utilities

19

20

**Returns:**

21

- `Some(LogicalPlan)` if the plugin handles this relation type

22

- `None` if the plugin doesn't recognize the relation

23

24

### ExpressionPlugin

25

26

Extends expression processing to support custom functions and operators.

27

28

```scala { .api }

29

trait ExpressionPlugin {

30

def transform(relation: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Expression]

31

}

32

```

33

34

**Parameters:**

35

- `relation`: Protocol buffer message containing the custom expression definition

36

- `planner`: The SparkConnectPlanner instance for accessing conversion utilities

37

38

**Returns:**

39

- `Some(Expression)` if the plugin handles this expression type

40

- `None` if the plugin doesn't recognize the expression

41

42

### CommandPlugin

43

44

Extends command processing to support custom operations and administrative commands.

45

46

```scala { .api }

47

trait CommandPlugin {

48

def process(command: com.google.protobuf.Any, planner: SparkConnectPlanner): Option[Unit]

49

}

50

```

51

52

**Parameters:**

53

- `command`: Protocol buffer message containing the custom command definition

54

- `planner`: The SparkConnectPlanner instance for accessing conversion utilities

55

56

**Returns:**

57

- `Some(Unit)` if the plugin handles this command type

58

- `None` if the plugin doesn't recognize the command

59

60

## Plugin Registry

61

62

### SparkConnectPluginRegistry

63

64

Central registry for managing all plugin types.

65

66

```scala { .api }

67

object SparkConnectPluginRegistry {

68

def relationRegistry: Seq[RelationPlugin]

69

def expressionRegistry: Seq[ExpressionPlugin]

70

def commandRegistry: Seq[CommandPlugin]

71

def createConfiguredPlugins[T](values: Seq[String]): Seq[T]

72

}

73

```

74

75

**Properties:**

76

- `relationRegistry`: All registered relation plugins

77

- `expressionRegistry`: All registered expression plugins

78

- `commandRegistry`: All registered command plugins

79

80

**Methods:**

81

- `createConfiguredPlugins[T]`: Creates plugin instances from configuration class names

82

83

## Main Plugin Integration

84

85

### SparkConnectPlugin

86

87

Integrates the Connect server with Spark's plugin system.

88

89

```scala { .api }

90

class SparkConnectPlugin extends SparkPlugin {

91

def driverPlugin(): DriverPlugin

92

def executorPlugin(): ExecutorPlugin

93

}

94

```

95

96

## Plugin Configuration

97

98

Plugins are configured through Spark configuration properties:

99

100

```scala { .api }

101

// From Connect configuration object

102

val CONNECT_EXTENSIONS_RELATION_CLASSES: ConfigEntry[Seq[String]]

103

val CONNECT_EXTENSIONS_EXPRESSION_CLASSES: ConfigEntry[Seq[String]]

104

val CONNECT_EXTENSIONS_COMMAND_CLASSES: ConfigEntry[Seq[String]]

105

```

106

107

## Usage Examples

108

109

### Creating a Custom Relation Plugin

110

111

```scala

112

import org.apache.spark.sql.connect.plugin.RelationPlugin

113

import org.apache.spark.sql.connect.planner.SparkConnectPlanner

114

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

115

import com.google.protobuf.Any

116

117

class MyCustomRelationPlugin extends RelationPlugin {

118

override def transform(relation: Any, planner: SparkConnectPlanner): Option[LogicalPlan] = {

119

// Check if this is our custom relation type

120

if (relation.is(MyCustomRelation.getDefaultInstance.getClass)) {

121

val customRel = relation.unpack(classOf[MyCustomRelation])

122

123

// Convert to Catalyst LogicalPlan

124

val logicalPlan = createLogicalPlan(customRel, planner)

125

Some(logicalPlan)

126

} else {

127

None

128

}

129

}

130

131

private def createLogicalPlan(rel: MyCustomRelation, planner: SparkConnectPlanner): LogicalPlan = {

132

// Implementation specific to your custom relation

133

???

134

}

135

}

136

```

137

138

### Creating a Custom Expression Plugin

139

140

```scala

141

import org.apache.spark.sql.connect.plugin.ExpressionPlugin

142

import org.apache.spark.sql.connect.planner.SparkConnectPlanner

143

import org.apache.spark.sql.catalyst.expressions.Expression

144

import com.google.protobuf.Any

145

146

class MyCustomExpressionPlugin extends ExpressionPlugin {

147

override def transform(relation: Any, planner: SparkConnectPlanner): Option[Expression] = {

148

if (relation.is(MyCustomExpression.getDefaultInstance.getClass)) {

149

val customExpr = relation.unpack(classOf[MyCustomExpression])

150

151

// Convert to Catalyst Expression

152

val catalystExpr = createExpression(customExpr, planner)

153

Some(catalystExpr)

154

} else {

155

None

156

}

157

}

158

159

private def createExpression(expr: MyCustomExpression, planner: SparkConnectPlanner): Expression = {

160

// Implementation specific to your custom expression

161

???

162

}

163

}

164

```

165

166

### Creating a Custom Command Plugin

167

168

```scala

169

import org.apache.spark.sql.connect.plugin.CommandPlugin

170

import org.apache.spark.sql.connect.planner.SparkConnectPlanner

171

import com.google.protobuf.Any

172

173

class MyCustomCommandPlugin extends CommandPlugin {

174

override def process(command: Any, planner: SparkConnectPlanner): Option[Unit] = {

175

if (command.is(MyCustomCommand.getDefaultInstance.getClass)) {

176

val customCmd = command.unpack(classOf[MyCustomCommand])

177

178

// Execute the custom command

179

executeCommand(customCmd, planner)

180

Some(())

181

} else {

182

None

183

}

184

}

185

186

private def executeCommand(cmd: MyCustomCommand, planner: SparkConnectPlanner): Unit = {

187

// Implementation specific to your custom command

188

???

189

}

190

}

191

```

192

193

### Configuring Plugins

194

195

Configure your plugins through Spark configuration:

196

197

```scala

198

import org.apache.spark.sql.SparkSession

199

200

val spark = SparkSession.builder()

201

.appName("MyApp")

202

.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")

203

.config("spark.connect.extensions.relation.classes", "com.mycompany.MyCustomRelationPlugin")

204

.config("spark.connect.extensions.expression.classes", "com.mycompany.MyCustomExpressionPlugin")

205

.config("spark.connect.extensions.command.classes", "com.mycompany.MyCustomCommandPlugin")

206

.getOrCreate()

207

```

208

209

Or via configuration files:

210

211

```properties

212

spark.connect.extensions.relation.classes=com.mycompany.MyCustomRelationPlugin,com.mycompany.AnotherRelationPlugin

213

spark.connect.extensions.expression.classes=com.mycompany.MyCustomExpressionPlugin

214

spark.connect.extensions.command.classes=com.mycompany.MyCustomCommandPlugin

215

```

216

217

## Plugin Lifecycle

218

219

1. **Registration**: Plugins are registered during server startup based on configuration

220

2. **Discovery**: The registry loads plugin classes using reflection

221

3. **Instantiation**: Plugin instances are created and cached

222

4. **Invocation**: Plugins are called during plan processing in registration order

223

5. **Chain Processing**: If a plugin returns `None`, the next plugin in the chain is tried

224

225

## Best Practices

226

227

### Plugin Implementation

228

229

- Always check the protocol buffer type before processing

230

- Return `None` for unrecognized message types

231

- Use the planner parameter for accessing conversion utilities

232

- Handle exceptions gracefully and log appropriate error messages

233

- Maintain thread safety as plugins may be called concurrently

234

235

### Error Handling

236

237

- Validate input protocol buffer messages thoroughly

238

- Provide meaningful error messages for debugging

239

- Use appropriate Spark exception types

240

- Consider performance implications of plugin processing

241

242

### Testing

243

244

- Unit test plugin logic independently

245

- Integration test with the full Connect server

246

- Test with various protocol buffer message types

247

- Verify plugin chain behavior when multiple plugins are registered

248

249

## Security Considerations

250

251

- Validate all input from protocol buffer messages

252

- Avoid executing arbitrary code based on user input

253

- Use appropriate access controls for sensitive operations

254

- Consider sandboxing for user-provided plugin code