or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md

session-management.mddocs/

0

# Session Management

1

2

The session management system handles client sessions, execution state, event tracking, and resource isolation. Each client connection is associated with a session that maintains its own Spark session, configuration, and execution context.

3

4

## Core Session Components

5

6

### SessionHolder

7

8

Manages Spark session state and configuration for Connect clients.

9

10

```scala { .api }

11

class SessionHolder(

12

userId: String,

13

sessionId: String,

14

session: SparkSession

15

) {

16

def userId: String

17

def sessionId: String

18

def session: SparkSession

19

def artifactManager: SparkConnectArtifactManager

20

// Additional session management methods (internal)

21

}

22

```

23

24

**Properties:**

25

- `userId`: Unique identifier for the user

26

- `sessionId`: Unique identifier for the session

27

- `session`: The underlying Spark session

28

- `artifactManager`: Handles artifacts for this session

29

30

### SparkConnectExecutionManager

31

32

Global tracker of all ExecuteHolder executions across all sessions.

33

34

```scala { .api }

35

class SparkConnectExecutionManager() {

36

def listActiveExecutions: Either[Long, Seq[ExecuteInfo]]

37

def listAbandonedExecutions: Seq[ExecuteInfo]

38

}

39

```

40

41

**Key Methods:**

42

- `listActiveExecutions`: Returns either the timestamp of last execution (if no active executions) or the list of all active executions

43

- `listAbandonedExecutions`: Returns list of executions that were abandoned and removed by periodic maintenance

44

45

**Note:** This is a global manager accessed through `SparkConnectService.executionManager`. Individual execution creation and management is handled through internal methods not exposed in the public API.

46

47

### ExecuteInfo

48

49

Information about an execution returned by the execution manager.

50

51

```scala { .api }

52

case class ExecuteInfo(

53

request: proto.ExecutePlanRequest,

54

userId: String,

55

sessionId: String,

56

operationId: String,

57

jobTag: String,

58

sparkSessionTags: Set[String],

59

reattachable: Boolean,

60

status: ExecuteStatus,

61

creationTime: Long,

62

lastAttachedRpcTime: Option[Long],

63

closedTime: Option[Long]

64

)

65

```

66

67

**Properties:**

68

- `request`: The original execution request

69

- `userId`: User who initiated the execution

70

- `sessionId`: Session containing the execution

71

- `operationId`: Unique identifier for the operation

72

- `jobTag`: Spark job tag for tracking

73

- `sparkSessionTags`: Tags associated with the Spark session

74

- `reattachable`: Whether execution supports reattachment

75

- `status`: Current execution status

76

- `creationTime`: When the execution was created (timestamp)

77

- `lastAttachedRpcTime`: Last time RPC was attached (if any)

78

- `closedTime`: When execution was closed (if closed)

79

80

### ExecuteHolder

81

82

Holds execution state and manages the execution lifecycle.

83

84

```scala { .api }

85

class ExecuteHolder(

86

executeId: String,

87

request: proto.ExecutePlanRequest,

88

sessionHolder: SessionHolder

89

) {

90

def executeId: String

91

def sessionHolder: SessionHolder

92

def request: proto.ExecutePlanRequest

93

def createdTime: Long

94

def startTime: Option[Long]

95

def status: ExecuteStatus

96

// Additional execution state methods (internal)

97

}

98

```

99

100

## Event Management

101

102

### SessionEventsManager

103

104

Manages session-level events and monitoring.

105

106

```scala { .api }

107

class SessionEventsManager(sessionHolder: SessionHolder) {

108

def recordSessionStart(): Unit

109

def recordSessionEnd(): Unit

110

def recordConfigChange(key: String, value: String): Unit

111

def getSessionMetrics: SessionMetrics

112

}

113

```

114

115

### ExecuteEventsManager

116

117

Manages execution-level events and state tracking.

118

119

```scala { .api }

120

class ExecuteEventsManager(executeHolder: ExecuteHolder) {

121

def recordExecutionStart(): Unit

122

def recordExecutionEnd(success: Boolean): Unit

123

def recordError(error: Throwable): Unit

124

def getExecutionMetrics: ExecutionMetrics

125

}

126

```

127

128

## Streaming Query Management

129

130

### SparkConnectStreamingQueryCache

131

132

Caches streaming queries for client access and management.

133

134

```scala { .api }

135

class SparkConnectStreamingQueryCache(sessionHolder: SessionHolder) {

136

def registerQuery(queryId: String, query: StreamingQuery): Unit

137

def getQuery(queryId: String): Option[StreamingQuery]

138

def removeQuery(queryId: String): Option[StreamingQuery]

139

def listActiveQueries: Seq[StreamingQuery]

140

def stopQuery(queryId: String): Boolean

141

}

142

```

143

144

## Session Access and Lifecycle

145

146

### Session Creation and Access

147

148

Sessions are managed through the SparkConnectService companion object:

149

150

```scala { .api }

151

object SparkConnectService {

152

def getOrCreateIsolatedSession(userId: String, sessionId: String): SessionHolder

153

def getIsolatedSession(userId: String, sessionId: String): SessionHolder

154

def removeSession(userId: String, sessionId: String): Option[SessionHolder]

155

}

156

```

157

158

**Key Methods:**

159

- `getOrCreateIsolatedSession`: Get existing session or create new one

160

- `getIsolatedSession`: Get existing session (returns None if not found)

161

- `removeSession`: Clean up and remove session

162

163

## Usage Examples

164

165

### Creating and Managing Sessions

166

167

```scala

168

import org.apache.spark.sql.connect.service.SparkConnectService

169

170

// Create or get existing session

171

val sessionHolder = SparkConnectService.getOrCreateIsolatedSession(

172

userId = "user123",

173

sessionId = "session456"

174

)

175

176

// Access the underlying Spark session

177

val sparkSession = sessionHolder.session

178

179

// Configure the session

180

sparkSession.conf.set("spark.sql.adaptive.enabled", "true")

181

sparkSession.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

182

```

183

184

### Managing Executions

185

186

```scala

187

import org.apache.spark.sql.connect.service.SparkConnectExecutionManager

188

import org.apache.spark.connect.proto

189

190

// Create execution manager

191

val executionManager = new SparkConnectExecutionManager(sessionHolder)

192

193

// Create new execution

194

val request: proto.ExecutePlanRequest = // ... from client

195

val executeHolder = executionManager.createExecution(request)

196

197

// List active executions

198

val activeExecutions = executionManager.listActiveExecutions

199

println(s"Active executions: ${activeExecutions.length}")

200

201

// Interrupt execution if needed

202

val interrupted = executionManager.interruptExecution(executeHolder.executeId)

203

```

204

205

### Streaming Query Management

206

207

```scala

208

import org.apache.spark.sql.connect.service.SparkConnectStreamingQueryCache

209

import org.apache.spark.sql.streaming.StreamingQuery

210

211

// Create streaming query cache

212

val queryCache = new SparkConnectStreamingQueryCache(sessionHolder)

213

214

// Register a streaming query

215

val query: StreamingQuery = // ... created streaming query

216

queryCache.registerQuery("query1", query)

217

218

// List active streaming queries

219

val activeQueries = queryCache.listActiveQueries

220

activeQueries.foreach { query =>

221

println(s"Query ${query.id}: ${query.status}")

222

}

223

224

// Stop a specific query

225

val stopped = queryCache.stopQuery("query1")

226

```

227

228

### Event Tracking

229

230

```scala

231

import org.apache.spark.sql.connect.service.{SessionEventsManager, ExecuteEventsManager}

232

233

// Track session events

234

val sessionEvents = new SessionEventsManager(sessionHolder)

235

sessionEvents.recordSessionStart()

236

sessionEvents.recordConfigChange("spark.sql.adaptive.enabled", "true")

237

238

// Track execution events

239

val executeEvents = new ExecuteEventsManager(executeHolder)

240

executeEvents.recordExecutionStart()

241

242

// Later, after execution completes

243

executeEvents.recordExecutionEnd(success = true)

244

245

// Get metrics

246

val sessionMetrics = sessionEvents.getSessionMetrics

247

val executionMetrics = executeEvents.getExecutionMetrics

248

```

249

250

## Session Isolation

251

252

Each session provides complete isolation from other sessions:

253

254

### Resource Isolation

255

256

- **Spark Session**: Each Connect session has its own SparkSession instance

257

- **Configuration**: Session-specific Spark configuration settings

258

- **Artifacts**: Isolated JAR files and class loaders per session

259

- **Temporary Views**: Session-scoped temporary views and UDFs

260

261

### State Isolation

262

263

- **Execution Context**: Independent execution contexts and thread pools

264

- **Streaming Queries**: Session-specific streaming query management

265

- **Metrics**: Separate metrics collection per session

266

- **Error Handling**: Session-scoped error reporting and logging

267

268

## Concurrency and Thread Safety

269

270

The session management system handles concurrent access safely:

271

272

### Thread Safety

273

274

- All session operations are thread-safe

275

- Execution state is protected with appropriate synchronization

276

- Event recording is atomic and thread-safe

277

- Query cache operations are synchronized

278

279

### Concurrent Executions

280

281

- Multiple executions can run simultaneously within a session

282

- Execution resources are managed independently

283

- Reattachable executions support fault tolerance

284

- Streaming queries run concurrently with batch operations

285

286

## Resource Management

287

288

### Memory Management

289

290

- Session-scoped memory limits and monitoring

291

- Automatic cleanup of completed executions

292

- Streaming query resource tracking

293

- Artifact garbage collection

294

295

### Cleanup and Lifecycle

296

297

```scala

298

// Sessions are automatically cleaned up when:

299

// 1. Client disconnects gracefully

300

// 2. Session timeout is reached

301

// 3. Explicit session termination

302

// 4. Server shutdown

303

304

// Manual cleanup example

305

val removed = SparkConnectService.removeSession(userId, sessionId)

306

removed.foreach { session =>

307

session.session.stop()

308

println(s"Session ${session.sessionId} cleaned up")

309

}

310

```

311

312

## Configuration and Tuning

313

314

### Session Configuration

315

316

Key configuration options for session management:

317

318

- `spark.connect.session.timeout`: Session idle timeout

319

- `spark.connect.execution.maxConcurrent`: Max concurrent executions per session

320

- `spark.connect.streaming.maxQueries`: Max streaming queries per session

321

- `spark.connect.artifacts.maxSize`: Max artifact cache size per session

322

323

### Performance Tuning

324

325

- Configure appropriate timeouts for long-running operations

326

- Tune concurrent execution limits based on cluster resources

327

- Monitor session metrics for resource usage patterns

328

- Implement custom cleanup policies for inactive sessions

329

330

## Error Handling and Recovery

331

332

### Session Recovery

333

334

- Sessions can be recovered after temporary disconnections

335

- Reattachable executions provide fault tolerance

336

- Streaming queries automatically recover from failures

337

- Artifact state is persisted across reconnections

338

339

### Error Reporting

340

341

- Session-level errors are reported with context

342

- Execution errors include session and user information

343

- Structured error messages for client consumption

344

- Detailed server-side logging for debugging