0
# Plan Processing
1
2
The plan processing system converts protocol buffer messages from Connect clients into Catalyst logical plans that can be executed by Spark. This includes relation transformation, expression conversion, and command processing.
3
4
## Core Planner
5
6
### SparkConnectPlanner
7
8
Main planner class that handles conversion from protocol buffer plans to Catalyst plans.
9
10
```scala { .api }
11
class SparkConnectPlanner(sessionHolder: SessionHolder) {
12
def transformRelation(rel: proto.Relation): LogicalPlan
13
def transformExpression(exp: proto.Expression): Expression
14
def process(command: proto.Command, responseObserver: StreamObserver[proto.ExecutePlanResponse], executeHolder: ExecuteHolder): Unit
15
}
16
```
17
18
**Parameters:**
19
- `sessionHolder`: The session context for plan processing
20
21
**Key Methods:**
22
- `transformRelation`: Converts protocol buffer relations to Catalyst LogicalPlan
23
- `transformExpression`: Converts protocol buffer expressions to Catalyst Expression
24
- `process`: Processes protocol buffer commands with streaming response
25
26
## Plan Execution
27
28
### SparkConnectPlanExecution
29
30
Manages the execution lifecycle of Spark Connect plans.
31
32
```scala { .api }
33
class SparkConnectPlanExecution(
34
executeHolder: ExecuteHolder,
35
sessionHolder: SessionHolder,
36
responseObserver: StreamObserver[proto.ExecutePlanResponse],
37
request: proto.ExecutePlanRequest
38
) {
39
// Execution lifecycle methods (internal implementation)
40
}
41
```
42
43
## Expression and Type Converters
44
45
### LiteralExpressionProtoConverter
46
47
Converts protocol buffer literals to Catalyst expressions.
48
49
```scala { .api }
50
object LiteralExpressionProtoConverter {
51
def toCatalystValue(literal: proto.Expression.Literal): Any
52
def toConnectProtoType(dt: DataType): proto.DataType
53
def toCatalystType(dt: proto.DataType): DataType
54
}
55
```
56
57
**Key Methods:**
58
- `toCatalystValue`: Convert protobuf literal to Catalyst value
59
- `toConnectProtoType`: Convert Catalyst DataType to protobuf DataType
60
- `toCatalystType`: Convert protobuf DataType to Catalyst DataType
61
62
### SaveModeConverter
63
64
Converts protocol buffer save modes to Spark save modes.
65
66
```scala { .api }
67
object SaveModeConverter {
68
def toSaveMode(mode: proto.WriteOperation.SaveMode): SaveMode
69
def toProto(mode: SaveMode): proto.WriteOperation.SaveMode
70
}
71
```
72
73
### TableSaveMethodConverter
74
75
Converts table save method configurations.
76
77
```scala { .api }
78
object TableSaveMethodConverter {
79
def toSaveMethod(method: proto.WriteOperation.SaveTable.TableSaveMethod): String
80
}
81
```
82
83
## Execution Infrastructure
84
85
### ExecuteHolder
86
87
Holds execution state and manages the execution lifecycle.
88
89
```scala { .api }
90
class ExecuteHolder(
91
executeId: String,
92
request: proto.ExecutePlanRequest,
93
sessionHolder: SessionHolder
94
) {
95
def executeId: String
96
def sessionHolder: SessionHolder
97
def request: proto.ExecutePlanRequest
98
// Additional state management methods (internal)
99
}
100
```
101
102
### ExecuteThreadRunner
103
104
Manages execution in separate threads for concurrent processing.
105
106
```scala { .api }
107
class ExecuteThreadRunner(
108
executeHolder: ExecuteHolder,
109
responseObserver: ExecuteResponseObserver[proto.ExecutePlanResponse]
110
) extends Runnable {
111
def run(): Unit
112
}
113
```
114
115
### ExecuteResponseObserver
116
117
Observes and streams execution responses to clients.
118
119
```scala { .api }
120
class ExecuteResponseObserver[T](
121
userId: String,
122
sessionId: String,
123
responseObserver: StreamObserver[T]
124
) extends StreamObserver[T] {
125
def onNext(value: T): Unit
126
def onError(t: Throwable): Unit
127
def onCompleted(): Unit
128
}
129
```
130
131
### ExecuteGrpcResponseSender
132
133
Sends execution responses via gRPC streaming.
134
135
```scala { .api }
136
class ExecuteGrpcResponseSender(
137
executeHolder: ExecuteHolder,
138
responseObserver: StreamObserver[proto.ExecutePlanResponse]
139
) {
140
def sendResponse(response: proto.ExecutePlanResponse): Unit
141
def sendError(error: Throwable): Unit
142
def sendCompleted(): Unit
143
}
144
```
145
146
### CachedStreamResponse
147
148
Caches streaming responses for reattachable executions.
149
150
```scala { .api }
151
class CachedStreamResponse(
152
executeId: String,
153
maxCacheSize: Int = 1000
154
) {
155
def addResponse(response: proto.ExecutePlanResponse): Unit
156
def getResponses(fromIndex: Int): Seq[proto.ExecutePlanResponse]
157
def size: Int
158
}
159
```
160
161
## Streaming Support
162
163
### StreamingForeachBatchHelper
164
165
Helper for streaming foreachBatch operations.
166
167
```scala { .api }
168
object StreamingForeachBatchHelper {
169
def foreachBatch(
170
pythonExec: proto.PythonUDF,
171
sessionHolder: SessionHolder
172
): (Dataset[Row], Long) => Unit
173
}
174
```
175
176
### StreamingQueryListenerHelper
177
178
Helper for streaming query listener management.
179
180
```scala { .api }
181
object StreamingQueryListenerHelper {
182
def addListener(
183
query: StreamingQuery,
184
listener: proto.StreamingQueryListener,
185
sessionHolder: SessionHolder
186
): StreamingQueryListener
187
}
188
```
189
190
## Usage Examples
191
192
### Basic Plan Processing
193
194
```scala
195
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
196
import org.apache.spark.connect.proto
197
198
// Create planner instance
199
val planner = new SparkConnectPlanner(sessionHolder)
200
201
// Transform a relation
202
val protoRelation: proto.Relation = // ... from client request
203
val logicalPlan = planner.transformRelation(protoRelation)
204
205
// Transform an expression
206
val protoExpression: proto.Expression = // ... from client request
207
val catalystExpr = planner.transformExpression(protoExpression)
208
```
209
210
### Processing with Response Streaming
211
212
```scala
213
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
214
import org.apache.spark.connect.proto
215
import io.grpc.stub.StreamObserver
216
217
// Set up response observer
218
val responseObserver: StreamObserver[proto.ExecutePlanResponse] = // ... from gRPC
219
220
// Create execute holder for state management
221
val executeHolder = new ExecuteHolder(executeId, request, sessionHolder)
222
223
// Process command with streaming response
224
val planner = new SparkConnectPlanner(sessionHolder)
225
planner.process(request.getPlan.getCommand, responseObserver, executeHolder)
226
```
227
228
### Custom Expression Conversion
229
230
```scala
231
import org.apache.spark.sql.connect.planner.LiteralExpressionProtoConverter
232
import org.apache.spark.connect.proto
233
234
// Convert protobuf literal to Catalyst value
235
val protoLiteral: proto.Expression.Literal = // ... from client
236
val catalystValue = LiteralExpressionProtoConverter.toCatalystValue(protoLiteral)
237
238
// Convert Catalyst DataType to protobuf
239
val catalystType: DataType = StringType
240
val protoType = LiteralExpressionProtoConverter.toConnectProtoType(catalystType)
241
```
242
243
### Save Operation Configuration
244
245
```scala
246
import org.apache.spark.sql.connect.planner.SaveModeConverter
247
import org.apache.spark.connect.proto
248
249
// Convert protobuf save mode to Spark SaveMode
250
val protoSaveMode = proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
251
val sparkSaveMode = SaveModeConverter.toSaveMode(protoSaveMode)
252
253
// Convert Spark SaveMode back to protobuf
254
val backToProto = SaveModeConverter.toProto(sparkSaveMode)
255
```
256
257
## Error Handling
258
259
Plan processing uses structured error handling to provide meaningful error messages to clients:
260
261
- **Validation Errors**: Invalid protocol buffer structure or unsupported operations
262
- **Conversion Errors**: Issues converting between protobuf and Catalyst representations
263
- **Execution Errors**: Runtime errors during plan execution
264
- **Resource Errors**: Memory limits, timeouts, or resource exhaustion
265
266
All errors are converted to appropriate gRPC status codes and include context about the failing operation.
267
268
## Performance Considerations
269
270
### Plan Caching
271
272
- Logical plans may be cached to avoid repeated conversion overhead
273
- Expression conversion results are cached for common operations
274
- Plugin results are cached when applicable
275
276
### Streaming Optimization
277
278
- Large result sets use streaming responses to avoid memory issues
279
- Reattachable executions cache intermediate results for fault tolerance
280
- Response batching reduces network overhead
281
282
### Concurrent Processing
283
284
- Multiple client requests are processed concurrently
285
- Thread-safe execution state management
286
- Resource isolation between sessions
287
288
## Extension Points
289
290
The plan processing system provides several extension points:
291
292
1. **Plugin System**: Custom relations, expressions, and commands via plugin interfaces
293
2. **Custom Converters**: Extend type conversion for domain-specific data types
294
3. **Execution Hooks**: Custom processing during plan execution lifecycle
295
4. **Response Processing**: Custom response formatting and streaming logic