or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregations.mdcatalog.mddata-io.mddataframe-dataset.mdfunctions-expressions.mdindex.mdsession-management.mdstreaming.md

session-management.mddocs/

0

# Session Management

1

2

SparkSession is the unified entry point for all Spark SQL functionality. It provides access to DataFrames, SQL execution, configuration management, and catalog operations. SparkSession replaces the legacy SQLContext and HiveContext, offering a single interface for all Spark SQL operations.

3

4

## SparkSession Creation

5

6

```scala { .api }

7

object SparkSession {

8

def builder(): SparkSession.Builder

9

def getActiveSession: Option[SparkSession]

10

def getDefaultSession: Option[SparkSession]

11

def setActiveSession(session: SparkSession): Unit

12

def setDefaultSession(session: SparkSession): Unit

13

def clearActiveSession(): Unit

14

def clearDefaultSession(): Unit

15

}

16

17

class SparkSession.Builder {

18

def appName(name: String): Builder

19

def master(master: String): Builder

20

def config(key: String, value: String): Builder

21

def config(key: String, value: Long): Builder

22

def config(key: String, value: Double): Builder

23

def config(key: String, value: Boolean): Builder

24

def config(conf: SparkConf): Builder

25

def enableHiveSupport(): Builder

26

def getOrCreate(): SparkSession

27

}

28

```

29

30

**Usage Example:**

31

32

```scala

33

import org.apache.spark.sql.SparkSession

34

35

// Create SparkSession with builder

36

val spark = SparkSession.builder()

37

.appName("My Spark Application")

38

.master("local[4]")

39

.config("spark.sql.adaptive.enabled", "true")

40

.config("spark.sql.adaptive.coalescePartitions.enabled", "true")

41

.enableHiveSupport()

42

.getOrCreate()

43

44

// Get existing session

45

val existingSession = SparkSession.getActiveSession

46

```

47

48

## Core Session Operations

49

50

```scala { .api }

51

class SparkSession {

52

def sql(sqlText: String): DataFrame

53

def read: DataFrameReader

54

def readStream: DataStreamReader

55

def catalog: Catalog

56

def conf: RuntimeConfig

57

def sparkContext: SparkContext

58

def version: String

59

def sessionState: SessionState

60

def sharedState: SharedState

61

def stop(): Unit

62

def close(): Unit

63

}

64

```

65

66

**Usage Examples:**

67

68

```scala

69

// Execute SQL queries

70

val result = spark.sql("SELECT * FROM my_table WHERE age > 25")

71

result.show()

72

73

// Access configuration

74

spark.conf.set("spark.sql.shuffle.partitions", "200")

75

val partitions = spark.conf.get("spark.sql.shuffle.partitions")

76

77

// Get Spark version

78

println(s"Spark version: ${spark.version}")

79

80

// Stop session

81

spark.stop()

82

```

83

84

## Data Creation Methods

85

86

```scala { .api }

87

class SparkSession {

88

def table(tableName: String): DataFrame

89

def range(end: Long): Dataset[Long]

90

def range(start: Long, end: Long): Dataset[Long]

91

def range(start: Long, end: Long, step: Long): Dataset[Long]

92

def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Long]

93

94

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

95

def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame

96

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame

97

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame

98

99

def createDataset[T : Encoder](data: Seq[T]): Dataset[T]

100

def createDataset[T : Encoder](data: RDD[T]): Dataset[T]

101

def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]

102

103

def emptyDataFrame: DataFrame

104

def emptyDataset[T : Encoder]: Dataset[T]

105

}

106

```

107

108

**Usage Examples:**

109

110

```scala

111

import org.apache.spark.sql.types._

112

import org.apache.spark.sql.Row

113

114

// Create DataFrame from sequence

115

case class Person(name: String, age: Int)

116

val people = Seq(Person("Alice", 25), Person("Bob", 30))

117

val peopleDF = spark.createDataFrame(people)

118

119

// Create DataFrame with explicit schema

120

val schema = StructType(Array(

121

StructField("name", StringType, nullable = false),

122

StructField("age", IntegerType, nullable = false)

123

))

124

val rows = Seq(Row("Charlie", 35), Row("Diana", 28))

125

val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)

126

127

// Create Dataset with type safety

128

val names = Seq("Alice", "Bob", "Charlie")

129

val namesDS = spark.createDataset(names)

130

131

// Create range Dataset

132

val numbers = spark.range(1, 1000000, 2, numPartitions = 100)

133

134

// Load existing table

135

val tableDF = spark.table("my_database.my_table")

136

```

137

138

## Session Configuration

139

140

```scala { .api }

141

class RuntimeConfig {

142

def set(key: String, value: String): Unit

143

def set(key: String, value: Boolean): Unit

144

def set(key: String, value: Long): Unit

145

def get(key: String): String

146

def get(key: String, default: String): String

147

def getOption(key: String): Option[String]

148

def unset(key: String): Unit

149

def getAll: Map[String, String]

150

def isModifiable(key: String): Boolean

151

}

152

```

153

154

**Common Configuration Properties:**

155

156

```scala

157

// Adaptive Query Execution

158

spark.conf.set("spark.sql.adaptive.enabled", "true")

159

spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

160

161

// Shuffle partitions

162

spark.conf.set("spark.sql.shuffle.partitions", "200")

163

164

// Broadcast join threshold

165

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

166

167

// Dynamic allocation

168

spark.conf.set("spark.dynamicAllocation.enabled", "true")

169

spark.conf.set("spark.dynamicAllocation.minExecutors", "1")

170

spark.conf.set("spark.dynamicAllocation.maxExecutors", "20")

171

172

// Serialization

173

spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

174

175

// Check if property is modifiable

176

val canModify = spark.conf.isModifiable("spark.sql.shuffle.partitions")

177

```

178

179

## Session Lifecycle Management

180

181

```scala

182

// Multiple sessions (advanced usage)

183

val session1 = SparkSession.builder()

184

.appName("Session1")

185

.master("local[2]")

186

.getOrCreate()

187

188

val session2 = SparkSession.builder()

189

.appName("Session2")

190

.master("local[2]")

191

.getOrCreate()

192

193

// Set active session

194

SparkSession.setActiveSession(session1)

195

196

// Clean shutdown

197

session1.stop()

198

session2.stop()

199

200

// Or use try-with-resources pattern

201

def withSparkSession[T](appName: String)(f: SparkSession => T): T = {

202

val spark = SparkSession.builder()

203

.appName(appName)

204

.master("local[*]")

205

.getOrCreate()

206

try {

207

f(spark)

208

} finally {

209

spark.stop()

210

}

211

}

212

213

// Usage

214

val result = withSparkSession("MyApp") { spark =>

215

spark.sql("SELECT 1 as test").collect()

216

}

217

```

218

219

## Session State and Shared State

220

221

```scala { .api }

222

// Session-specific state (per SparkSession)

223

trait SessionState {

224

def catalog: SessionCatalog

225

def analyzer: Analyzer

226

def optimizer: Optimizer

227

def planner: SparkPlanner

228

def conf: SQLConf

229

}

230

231

// Shared state (across SparkSessions in same SparkContext)

232

trait SharedState {

233

def sparkContext: SparkContext

234

def externalCatalog: ExternalCatalog

235

def globalTempViewManager: GlobalTempViewManager

236

def cacheManager: CacheManager

237

}

238

```

239

240

The session and shared state provide access to internal Spark SQL components, primarily used for advanced use cases and debugging.

241

242

## Integration with Hive

243

244

```scala

245

// Enable Hive support during session creation

246

val spark = SparkSession.builder()

247

.appName("Hive Integration")

248

.enableHiveSupport()

249

.getOrCreate()

250

251

// Access Hive tables

252

val hiveTable = spark.table("hive_database.hive_table")

253

254

// Execute HiveQL

255

val result = spark.sql("SHOW TABLES IN hive_database")

256

257

// Use Hive SerDes and file formats

258

val df = spark.read

259

.format("hive")

260

.option("inputFormat", "org.apache.hadoop.mapred.TextInputFormat")

261

.option("outputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")

262

.option("serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")

263

.load("/path/to/hive/table")

264

```