or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md

plan-processing.mddocs/

0

# Plan Processing

1

2

The plan processing system converts protocol buffer messages from Connect clients into Catalyst logical plans that can be executed by Spark. This includes relation transformation, expression conversion, and command processing.

3

4

## Core Planner

5

6

### SparkConnectPlanner

7

8

Main planner class that handles conversion from protocol buffer plans to Catalyst plans.

9

10

```scala { .api }

11

class SparkConnectPlanner(sessionHolder: SessionHolder) {

12

def transformRelation(rel: proto.Relation): LogicalPlan

13

def transformExpression(exp: proto.Expression): Expression

14

def process(command: proto.Command, responseObserver: StreamObserver[proto.ExecutePlanResponse], executeHolder: ExecuteHolder): Unit

15

}

16

```

17

18

**Parameters:**

19

- `sessionHolder`: The session context for plan processing

20

21

**Key Methods:**

22

- `transformRelation`: Converts protocol buffer relations to Catalyst LogicalPlan

23

- `transformExpression`: Converts protocol buffer expressions to Catalyst Expression

24

- `process`: Processes protocol buffer commands with streaming response

25

26

## Plan Execution

27

28

### SparkConnectPlanExecution

29

30

Manages the execution lifecycle of Spark Connect plans.

31

32

```scala { .api }

33

class SparkConnectPlanExecution(

34

executeHolder: ExecuteHolder,

35

sessionHolder: SessionHolder,

36

responseObserver: StreamObserver[proto.ExecutePlanResponse],

37

request: proto.ExecutePlanRequest

38

) {

39

// Execution lifecycle methods (internal implementation)

40

}

41

```

42

43

## Expression and Type Converters

44

45

### LiteralExpressionProtoConverter

46

47

Converts protocol buffer literals to Catalyst expressions.

48

49

```scala { .api }

50

object LiteralExpressionProtoConverter {

51

def toCatalystValue(literal: proto.Expression.Literal): Any

52

def toConnectProtoType(dt: DataType): proto.DataType

53

def toCatalystType(dt: proto.DataType): DataType

54

}

55

```

56

57

**Key Methods:**

58

- `toCatalystValue`: Convert protobuf literal to Catalyst value

59

- `toConnectProtoType`: Convert Catalyst DataType to protobuf DataType

60

- `toCatalystType`: Convert protobuf DataType to Catalyst DataType

61

62

### SaveModeConverter

63

64

Converts protocol buffer save modes to Spark save modes.

65

66

```scala { .api }

67

object SaveModeConverter {

68

def toSaveMode(mode: proto.WriteOperation.SaveMode): SaveMode

69

def toProto(mode: SaveMode): proto.WriteOperation.SaveMode

70

}

71

```

72

73

### TableSaveMethodConverter

74

75

Converts table save method configurations.

76

77

```scala { .api }

78

object TableSaveMethodConverter {

79

def toSaveMethod(method: proto.WriteOperation.SaveTable.TableSaveMethod): String

80

}

81

```

82

83

## Execution Infrastructure

84

85

### ExecuteHolder

86

87

Holds execution state and manages the execution lifecycle.

88

89

```scala { .api }

90

class ExecuteHolder(

91

executeId: String,

92

request: proto.ExecutePlanRequest,

93

sessionHolder: SessionHolder

94

) {

95

def executeId: String

96

def sessionHolder: SessionHolder

97

def request: proto.ExecutePlanRequest

98

// Additional state management methods (internal)

99

}

100

```

101

102

### ExecuteThreadRunner

103

104

Manages execution in separate threads for concurrent processing.

105

106

```scala { .api }

107

class ExecuteThreadRunner(

108

executeHolder: ExecuteHolder,

109

responseObserver: ExecuteResponseObserver[proto.ExecutePlanResponse]

110

) extends Runnable {

111

def run(): Unit

112

}

113

```

114

115

### ExecuteResponseObserver

116

117

Observes and streams execution responses to clients.

118

119

```scala { .api }

120

class ExecuteResponseObserver[T](

121

userId: String,

122

sessionId: String,

123

responseObserver: StreamObserver[T]

124

) extends StreamObserver[T] {

125

def onNext(value: T): Unit

126

def onError(t: Throwable): Unit

127

def onCompleted(): Unit

128

}

129

```

130

131

### ExecuteGrpcResponseSender

132

133

Sends execution responses via gRPC streaming.

134

135

```scala { .api }

136

class ExecuteGrpcResponseSender(

137

executeHolder: ExecuteHolder,

138

responseObserver: StreamObserver[proto.ExecutePlanResponse]

139

) {

140

def sendResponse(response: proto.ExecutePlanResponse): Unit

141

def sendError(error: Throwable): Unit

142

def sendCompleted(): Unit

143

}

144

```

145

146

### CachedStreamResponse

147

148

Caches streaming responses for reattachable executions.

149

150

```scala { .api }

151

class CachedStreamResponse(

152

executeId: String,

153

maxCacheSize: Int = 1000

154

) {

155

def addResponse(response: proto.ExecutePlanResponse): Unit

156

def getResponses(fromIndex: Int): Seq[proto.ExecutePlanResponse]

157

def size: Int

158

}

159

```

160

161

## Streaming Support

162

163

### StreamingForeachBatchHelper

164

165

Helper for streaming foreachBatch operations.

166

167

```scala { .api }

168

object StreamingForeachBatchHelper {

169

def foreachBatch(

170

pythonExec: proto.PythonUDF,

171

sessionHolder: SessionHolder

172

): (Dataset[Row], Long) => Unit

173

}

174

```

175

176

### StreamingQueryListenerHelper

177

178

Helper for streaming query listener management.

179

180

```scala { .api }

181

object StreamingQueryListenerHelper {

182

def addListener(

183

query: StreamingQuery,

184

listener: proto.StreamingQueryListener,

185

sessionHolder: SessionHolder

186

): StreamingQueryListener

187

}

188

```

189

190

## Usage Examples

191

192

### Basic Plan Processing

193

194

```scala

195

import org.apache.spark.sql.connect.planner.SparkConnectPlanner

196

import org.apache.spark.connect.proto

197

198

// Create planner instance

199

val planner = new SparkConnectPlanner(sessionHolder)

200

201

// Transform a relation

202

val protoRelation: proto.Relation = // ... from client request

203

val logicalPlan = planner.transformRelation(protoRelation)

204

205

// Transform an expression

206

val protoExpression: proto.Expression = // ... from client request

207

val catalystExpr = planner.transformExpression(protoExpression)

208

```

209

210

### Processing with Response Streaming

211

212

```scala

213

import org.apache.spark.sql.connect.planner.SparkConnectPlanner

214

import org.apache.spark.connect.proto

215

import io.grpc.stub.StreamObserver

216

217

// Set up response observer

218

val responseObserver: StreamObserver[proto.ExecutePlanResponse] = // ... from gRPC

219

220

// Create execute holder for state management

221

val executeHolder = new ExecuteHolder(executeId, request, sessionHolder)

222

223

// Process command with streaming response

224

val planner = new SparkConnectPlanner(sessionHolder)

225

planner.process(request.getPlan.getCommand, responseObserver, executeHolder)

226

```

227

228

### Custom Expression Conversion

229

230

```scala

231

import org.apache.spark.sql.connect.planner.LiteralExpressionProtoConverter

232

import org.apache.spark.connect.proto

233

234

// Convert protobuf literal to Catalyst value

235

val protoLiteral: proto.Expression.Literal = // ... from client

236

val catalystValue = LiteralExpressionProtoConverter.toCatalystValue(protoLiteral)

237

238

// Convert Catalyst DataType to protobuf

239

val catalystType: DataType = StringType

240

val protoType = LiteralExpressionProtoConverter.toConnectProtoType(catalystType)

241

```

242

243

### Save Operation Configuration

244

245

```scala

246

import org.apache.spark.sql.connect.planner.SaveModeConverter

247

import org.apache.spark.connect.proto

248

249

// Convert protobuf save mode to Spark SaveMode

250

val protoSaveMode = proto.WriteOperation.SaveMode.SAVE_MODE_APPEND

251

val sparkSaveMode = SaveModeConverter.toSaveMode(protoSaveMode)

252

253

// Convert Spark SaveMode back to protobuf

254

val backToProto = SaveModeConverter.toProto(sparkSaveMode)

255

```

256

257

## Error Handling

258

259

Plan processing uses structured error handling to provide meaningful error messages to clients:

260

261

- **Validation Errors**: Invalid protocol buffer structure or unsupported operations

262

- **Conversion Errors**: Issues converting between protobuf and Catalyst representations

263

- **Execution Errors**: Runtime errors during plan execution

264

- **Resource Errors**: Memory limits, timeouts, or resource exhaustion

265

266

All errors are converted to appropriate gRPC status codes and include context about the failing operation.

267

268

## Performance Considerations

269

270

### Plan Caching

271

272

- Logical plans may be cached to avoid repeated conversion overhead

273

- Expression conversion results are cached for common operations

274

- Plugin results are cached when applicable

275

276

### Streaming Optimization

277

278

- Large result sets use streaming responses to avoid memory issues

279

- Reattachable executions cache intermediate results for fault tolerance

280

- Response batching reduces network overhead

281

282

### Concurrent Processing

283

284

- Multiple client requests are processed concurrently

285

- Thread-safe execution state management

286

- Resource isolation between sessions

287

288

## Extension Points

289

290

The plan processing system provides several extension points:

291

292

1. **Plugin System**: Custom relations, expressions, and commands via plugin interfaces

293

2. **Custom Converters**: Extend type conversion for domain-specific data types

294

3. **Execution Hooks**: Custom processing during plan execution lifecycle

295

4. **Response Processing**: Custom response formatting and streaming logic