or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md

server-management.mddocs/

0

# Server Management

1

2

This document covers the core server functionality including lifecycle management, service configuration, and standalone deployment options.

3

4

## Core Server Classes

5

6

### SparkConnectService

7

8

Main gRPC service implementation that handles all client requests.

9

10

```scala { .api }

11

class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {

12

def executePlan(request: proto.ExecutePlanRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit

13

def analyzePlan(request: proto.AnalyzePlanRequest, responseObserver: StreamObserver[proto.AnalyzePlanResponse]): Unit

14

def config(request: proto.ConfigRequest, responseObserver: StreamObserver[proto.ConfigResponse]): Unit

15

def addArtifacts(responseObserver: StreamObserver[proto.AddArtifactsResponse]): StreamObserver[proto.AddArtifactsRequest]

16

def artifactStatus(request: proto.ArtifactStatusesRequest, responseObserver: StreamObserver[proto.ArtifactStatusesResponse]): Unit

17

def interrupt(request: proto.InterruptRequest, responseObserver: StreamObserver[proto.InterruptResponse]): Unit

18

def reattachExecute(request: proto.ReattachExecuteRequest, responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit

19

def releaseExecute(request: proto.ReleaseExecuteRequest, responseObserver: StreamObserver[proto.ReleaseExecuteResponse]): Unit

20

}

21

```

22

23

### SparkConnectService Companion Object

24

25

Server lifecycle management and session utilities.

26

27

```scala { .api }

28

object SparkConnectService {

29

def start(sc: SparkContext): Unit

30

def stop(timeout: Option[Long] = None, unit: Option[TimeUnit] = None): Unit

31

def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder

32

def getIsolatedSession(userId: String, sessionId: String): SessionHolder

33

def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]

34

def server: Server

35

def uiTab: Option[SparkConnectServerTab]

36

}

37

```

38

39

### SparkConnectServer

40

41

Standalone server application entry point.

42

43

```scala { .api }

44

object SparkConnectServer {

45

def main(args: Array[String]): Unit

46

}

47

```

48

49

### SimpleSparkConnectService

50

51

Simplified service for testing and development.

52

53

```scala { .api }

54

object SimpleSparkConnectService {

55

def main(args: Array[String]): Unit

56

}

57

```

58

59

## Request Handlers

60

61

The server delegates request processing to specialized handler classes:

62

63

### SparkConnectExecutePlanHandler

64

65

```scala { .api }

66

class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {

67

def handle(request: proto.ExecutePlanRequest): Unit

68

}

69

```

70

71

### SparkConnectAnalyzeHandler

72

73

```scala { .api }

74

class SparkConnectAnalyzeHandler(responseObserver: StreamObserver[proto.AnalyzePlanResponse]) {

75

def handle(request: proto.AnalyzePlanRequest): Unit

76

}

77

```

78

79

### SparkConnectConfigHandler

80

81

```scala { .api }

82

class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigResponse]) {

83

def handle(request: proto.ConfigRequest): Unit

84

}

85

```

86

87

### SparkConnectAddArtifactsHandler

88

89

```scala { .api }

90

class SparkConnectAddArtifactsHandler(responseObserver: StreamObserver[proto.AddArtifactsResponse]) {

91

def handle(request: proto.AddArtifactsRequest): Unit

92

}

93

```

94

95

### SparkConnectArtifactStatusesHandler

96

97

```scala { .api }

98

class SparkConnectArtifactStatusesHandler(responseObserver: StreamObserver[proto.ArtifactStatusesResponse]) {

99

def handle(request: proto.ArtifactStatusesRequest): Unit

100

}

101

```

102

103

### SparkConnectInterruptHandler

104

105

```scala { .api }

106

class SparkConnectInterruptHandler(responseObserver: StreamObserver[proto.InterruptResponse]) {

107

def handle(request: proto.InterruptRequest): Unit

108

}

109

```

110

111

### SparkConnectReattachExecuteHandler

112

113

```scala { .api }

114

class SparkConnectReattachExecuteHandler(responseObserver: StreamObserver[proto.ExecutePlanResponse]) {

115

def handle(request: proto.ReattachExecuteRequest): Unit

116

}

117

```

118

119

### SparkConnectReleaseExecuteHandler

120

121

```scala { .api }

122

class SparkConnectReleaseExecuteHandler(responseObserver: StreamObserver[proto.ReleaseExecuteResponse]) {

123

def handle(request: proto.ReleaseExecuteRequest): Unit

124

}

125

```

126

127

## Interceptor System

128

129

The server supports gRPC interceptors for cross-cutting concerns.

130

131

### SparkConnectInterceptorRegistry

132

133

```scala { .api }

134

object SparkConnectInterceptorRegistry {

135

def chainInterceptors(sb: NettyServerBuilder): Unit

136

def createConfiguredInterceptors(): Seq[ServerInterceptor]

137

}

138

```

139

140

### Built-in Interceptors

141

142

```scala { .api }

143

class LoggingInterceptor extends ServerInterceptor

144

145

class LocalPropertiesCleanupInterceptor extends ServerInterceptor

146

```

147

148

## Usage Examples

149

150

### Embedded Server

151

152

```scala

153

import org.apache.spark.sql.SparkSession

154

import org.apache.spark.sql.connect.service.SparkConnectService

155

156

// Create Spark session

157

val spark = SparkSession.builder()

158

.appName("MyApp")

159

.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")

160

.getOrCreate()

161

162

// Start Connect server

163

SparkConnectService.start(spark.sparkContext)

164

165

// Server is now listening for client connections

166

println("Connect server started")

167

168

// Later, shutdown the server

169

SparkConnectService.stop(Some(30), Some(TimeUnit.SECONDS))

170

```

171

172

### Server Status and Monitoring

173

174

```scala

175

import org.apache.spark.sql.connect.service.SparkConnectService

176

177

// Check active executions

178

SparkConnectService.listActiveExecutions match {

179

case Left(count) => println(s"Active executions: $count")

180

case Right(executions) =>

181

executions.foreach { exec =>

182

println(s"Execution ${exec.executeId}: ${exec.status}")

183

}

184

}

185

186

// Get server instance

187

val server = SparkConnectService.server

188

println(s"Server port: ${server.getPort}")

189

println(s"Server services: ${server.getServices.size()}")

190

```

191

192

## Configuration

193

194

Server behavior is controlled through Spark configuration properties. See the [Configuration](./configuration.md) documentation for details on available settings including:

195

196

- Binding port and network settings

197

- Message size limits and timeouts

198

- Plugin class configuration

199

- UI and monitoring settings

200

- Security and authentication options

201

202

## Error Handling

203

204

All request handlers use centralized error handling through the ErrorUtils utility, which:

205

206

- Converts Spark exceptions to appropriate gRPC status codes

207

- Sanitizes error messages for client consumption

208

- Logs detailed error information for debugging

209

- Provides structured error responses with user and session context