0
# Server Management
1
2
This document covers the core server functionality including lifecycle management, service configuration, and standalone deployment options.
3
4
## Core Server Classes
5
6
### SparkConnectService
7
8
Main gRPC service implementation that handles all client requests.
9
10
```scala { .api }
11
class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
12
def executePlan(request: proto.ExecutePlanRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit
13
def analyzePlan(request: proto.AnalyzePlanRequest, responseObserver: StreamObserver[proto.AnalyzePlanResponse]): Unit
14
def config(request: proto.ConfigRequest, responseObserver: StreamObserver[proto.ConfigResponse]): Unit
15
def addArtifacts(responseObserver: StreamObserver[proto.AddArtifactsResponse]): StreamObserver[proto.AddArtifactsRequest]
16
def artifactStatus(request: proto.ArtifactStatusesRequest, responseObserver: StreamObserver[proto.ArtifactStatusesResponse]): Unit
17
def interrupt(request: proto.InterruptRequest, responseObserver: StreamObserver[proto.InterruptResponse]): Unit
18
def reattachExecute(request: proto.ReattachExecuteRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit
19
def releaseExecute(request: proto.ReleaseExecuteRequest, responseObserver: StreamObserver[proto.ReleaseExecuteResponse]): Unit
20
}
21
```
22
23
### SparkConnectService Companion Object
24
25
Server lifecycle management and session utilities.
26
27
```scala { .api }
28
object SparkConnectService {
29
def start(sc: SparkContext): Unit
30
def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit
31
def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder
32
def getIsolatedSession(userId: String, sessionId: String): SessionHolder
33
def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]
34
def server: Server
35
def uiTab: Option[SparkConnectServerTab]
36
}
37
```
38
39
### SparkConnectServer
40
41
Standalone server application entry point.
42
43
```scala { .api }
44
object SparkConnectServer {
45
def main(args: Array[String]): Unit
46
}
47
```
48
49
### SimpleSparkConnectService
50
51
Simplified service for testing and development.
52
53
```scala { .api }
54
object SimpleSparkConnectService {
55
def main(args: Array[String]): Unit
56
}
57
```
58
59
## Request Handlers
60
61
The server delegates request processing to specialized handler classes:
62
63
### SparkConnectExecutePlanHandler
64
65
```scala { .api }
66
class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {
67
def handle(request: proto.ExecutePlanRequest): Unit
68
}
69
```
70
71
### SparkConnectAnalyzeHandler
72
73
```scala { .api }
74
class SparkConnectAnalyzeHandler(responseObserver: StreamObserver[proto.AnalyzePlanResponse]) {
75
def handle(request: proto.AnalyzePlanRequest): Unit
76
}
77
```
78
79
### SparkConnectConfigHandler
80
81
```scala { .api }
82
class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigResponse]) {
83
def handle(request: proto.ConfigRequest): Unit
84
}
85
```
86
87
### SparkConnectAddArtifactsHandler
88
89
```scala { .api }
90
class SparkConnectAddArtifactsHandler(responseObserver: StreamObserver[proto.AddArtifactsResponse]) {
91
def handle(request: proto.AddArtifactsRequest): Unit
92
}
93
```
94
95
### SparkConnectArtifactStatusesHandler
96
97
```scala { .api }
98
class SparkConnectArtifactStatusesHandler(responseObserver: StreamObserver[proto.ArtifactStatusesResponse]) {
99
def handle(request: proto.ArtifactStatusesRequest): Unit
100
}
101
```
102
103
### SparkConnectInterruptHandler
104
105
```scala { .api }
106
class SparkConnectInterruptHandler(responseObserver: StreamObserver[proto.InterruptResponse]) {
107
def handle(request: proto.InterruptRequest): Unit
108
}
109
```
110
111
### SparkConnectReattachExecuteHandler
112
113
```scala { .api }
114
class SparkConnectReattachExecuteHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {
115
def handle(request: proto.ReattachExecuteRequest): Unit
116
}
117
```
118
119
### SparkConnectReleaseExecuteHandler
120
121
```scala { .api }
122
class SparkConnectReleaseExecuteHandler(responseObserver: StreamObserver[proto.ReleaseExecuteResponse]) {
123
def handle(request: proto.ReleaseExecuteRequest): Unit
124
}
125
```
126
127
## Interceptor System
128
129
The server supports gRPC interceptors for cross-cutting concerns.
130
131
### SparkConnectInterceptorRegistry
132
133
```scala { .api }
134
object SparkConnectInterceptorRegistry {
135
def chainInterceptors(sb: NettyServerBuilder): Unit
136
def createConfiguredInterceptors(): Seq[ServerInterceptor]
137
}
138
```
139
140
### Built-in Interceptors
141
142
```scala { .api }
143
class LoggingInterceptor extends ServerInterceptor
144
145
class LocalPropertiesCleanupInterceptor extends ServerInterceptor
146
```
147
148
## Usage Examples
149
150
### Embedded Server
151
152
```scala
153
import org.apache.spark.sql.SparkSession
154
import org.apache.spark.sql.connect.service.SparkConnectService
155
156
// Create Spark session
157
val spark = SparkSession.builder()
158
.appName("MyApp")
159
.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
160
.getOrCreate()
161
162
// Start Connect server
163
SparkConnectService.start(spark.sparkContext)
164
165
// Server is now listening for client connections
166
println("Connect server started")
167
168
// Later, shutdown the server
169
SparkConnectService.stop(Some(30), Some(TimeUnit.SECONDS))
170
```
171
172
### Server Status and Monitoring
173
174
```scala
175
import org.apache.spark.sql.connect.service.SparkConnectService
176
177
// Check active executions
178
SparkConnectService.listActiveExecutions match {
179
case Left(count) => println(s"Active executions: $count")
180
case Right(executions) =>
181
executions.foreach { exec =>
182
println(s"Execution ${exec.executeId}: ${exec.status}")
183
}
184
}
185
186
// Get server instance
187
val server = SparkConnectService.server
188
println(s"Server port: ${server.getPort}")
189
println(s"Server services: ${server.getServices.size()}")
190
```
191
192
## Configuration
193
194
Server behavior is controlled through Spark configuration properties. See the [Configuration](./configuration.md) documentation for details on available settings including:
195
196
- Binding port and network settings
197
- Message size limits and timeouts
198
- Plugin class configuration
199
- UI and monitoring settings
200
- Security and authentication options
201
202
## Error Handling
203
204
All request handlers use centralized error handling through the ErrorUtils utility, which:
205
206
- Converts Spark exceptions to appropriate gRPC status codes
207
- Sanitizes error messages for client consumption
208
- Logs detailed error information for debugging
209
- Provides structured error responses with user and session context