or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog.mddata-sources.mddata-types.mddataframe-dataset.mdindex.mdsession-management.mdsql-functions.mdstreaming.mdudfs.md

session-management.mddocs/

0

# Apache Spark SQL - Session Management

1

2

## Capabilities

3

4

### SparkSession Creation and Configuration

5

- Create SparkSession instances using the builder pattern with flexible configuration options

6

- Configure Spark applications with runtime settings, SQL configurations, and custom properties

7

- Access application metadata, configuration state, and runtime information

8

- Manage application lifecycle including stopping sessions gracefully

9

10

### Runtime Configuration Management

11

- Set and retrieve runtime configuration properties during application execution

12

- Check configuration property mutability and reset configurations to defaults

13

- Handle both Spark SQL specific and general Spark configuration parameters

14

- Validate configuration values and handle configuration-related errors

15

16

### Application Context Access

17

- Access Spark application details including application ID, name, and start time

18

- Retrieve SparkContext for lower-level Spark functionality when needed

19

- Monitor application state and resource allocation information

20

21

## API Reference

22

23

### SparkSession Class

24

```scala { .api }

25

abstract class SparkSession extends Serializable with Closeable {

26

// Session properties

27

def sparkContext: SparkContext

28

def conf: RuntimeConfig

29

def version: String

30

31

// Data access interfaces

32

def read: DataFrameReader

33

def readStream: DataStreamReader

34

def streams: StreamingQueryManager

35

def catalog: Catalog

36

def udf: UDFRegistration

37

38

// SQL execution

39

def sql(sqlText: String): DataFrame

40

def table(tableName: String): DataFrame

41

42

// DataFrame/Dataset creation

43

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

44

def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame

45

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame

46

def createDataset[T : Encoder](data: Seq[T]): Dataset[T]

47

def createDataset[T : Encoder](data: RDD[T]): Dataset[T]

48

def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T]

49

50

// Session lifecycle

51

def stop(): Unit

52

def close(): Unit

53

def newSession(): SparkSession

54

}

55

```

56

57

### SparkSession.Builder Class

58

```scala { .api }

59

class Builder {

60

// Basic configuration

61

def appName(name: String): Builder

62

def master(master: String): Builder

63

def config(key: String, value: String): Builder

64

def config(key: String, value: Long): Builder

65

def config(key: String, value: Double): Builder

66

def config(key: String, value: Boolean): Builder

67

def config(conf: SparkConf): Builder

68

69

// Advanced configuration

70

def enableHiveSupport(): Builder

71

def withExtensions(f: SparkSessionExtensions => Unit): Builder

72

73

// Session creation

74

def getOrCreate(): SparkSession

75

}

76

```

77

78

### RuntimeConfig Class

79

```scala { .api }

80

abstract class RuntimeConfig {

81

// Configuration getters

82

def get(key: String): String

83

def get(key: String, default: String): String

84

def getOption(key: String): Option[String]

85

def getAll: Map[String, String]

86

87

// Configuration setters

88

def set(key: String, value: String): Unit

89

def set(key: String, value: Boolean): Unit

90

def set(key: String, value: Long): Unit

91

92

// Configuration management

93

def unset(key: String): Unit

94

def isModifiable(key: String): Boolean

95

}

96

```

97

98

### Supporting Types

99

100

#### SparkConf

101

```scala { .api }

102

class SparkConf(loadDefaults: Boolean = true) extends Cloneable with Serializable {

103

def set(key: String, value: String): SparkConf

104

def setAppName(name: String): SparkConf

105

def setMaster(master: String): SparkConf

106

def get(key: String): String

107

def get(key: String, defaultValue: String): String

108

def getOption(key: String): Option[String]

109

def getAll: Array[(String, String)]

110

def remove(key: String): SparkConf

111

}

112

```

113

114

#### SparkContext

115

```scala { .api }

116

class SparkContext(config: SparkConf) extends Logging {

117

def appName: String

118

def applicationId: String

119

def master: String

120

def deployMode: String

121

def version: String

122

def startTime: Long

123

def defaultParallelism: Int

124

def getConf: SparkConf

125

def isLocal: Boolean

126

def isStopped: Boolean

127

def stop(): Unit

128

}

129

```

130

131

## Usage Examples

132

133

### Creating a Basic SparkSession

134

```scala

135

import org.apache.spark.sql.SparkSession

136

137

val spark = SparkSession.builder()

138

.appName("My Spark Application")

139

.master("local[*]")

140

.config("spark.sql.warehouse.dir", "/path/to/warehouse")

141

.getOrCreate()

142

```

143

144

### Creating SparkSession with Custom Configuration

145

```scala

146

import org.apache.spark.SparkConf

147

import org.apache.spark.sql.SparkSession

148

149

val conf = new SparkConf()

150

.setAppName("Advanced Spark App")

151

.setMaster("yarn")

152

.set("spark.sql.adaptive.enabled", "true")

153

.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

154

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

155

156

val spark = SparkSession.builder()

157

.config(conf)

158

.enableHiveSupport()

159

.getOrCreate()

160

```

161

162

### Runtime Configuration Management

163

```scala

164

// Get current configuration

165

val currentWarehouse = spark.conf.get("spark.sql.warehouse.dir")

166

167

// Set configuration at runtime

168

spark.conf.set("spark.sql.shuffle.partitions", "200")

169

spark.conf.set("spark.sql.adaptive.enabled", true)

170

171

// Check if configuration can be modified

172

val canModify = spark.conf.isModifiable("spark.sql.shuffle.partitions")

173

174

// Get all configurations

175

val allConfigs = spark.conf.getAll

176

allConfigs.foreach { case (key, value) =>

177

println(s"$key = $value")

178

}

179

180

// Unset configuration (reset to default)

181

spark.conf.unset("spark.sql.shuffle.partitions")

182

```

183

184

### Session Extensions and Custom Configuration

185

```scala

186

import org.apache.spark.sql.SparkSessionExtensions

187

188

val spark = SparkSession.builder()

189

.appName("Extended Spark Session")

190

.withExtensions { extensions =>

191

// Register custom rules, functions, etc.

192

extensions.injectResolutionRule { session =>

193

// Custom resolution rule implementation

194

new CustomResolutionRule(session)

195

}

196

}

197

.config("spark.sql.extensions", "com.example.MySparkExtension")

198

.getOrCreate()

199

```

200

201

### Accessing Application Information

202

```scala

203

// Get application details

204

println(s"Application ID: ${spark.sparkContext.applicationId}")

205

println(s"Application Name: ${spark.sparkContext.appName}")

206

println(s"Spark Version: ${spark.version}")

207

println(s"Master URL: ${spark.sparkContext.master}")

208

println(s"Deploy Mode: ${spark.sparkContext.deployMode}")

209

println(s"Default Parallelism: ${spark.sparkContext.defaultParallelism}")

210

211

// Check application state

212

if (!spark.sparkContext.isStopped) {

213

println("Application is running")

214

}

215

```

216

217

### Managing Multiple Sessions

218

```scala

219

// Create primary session

220

val primarySpark = SparkSession.builder()

221

.appName("Primary Session")

222

.getOrCreate()

223

224

// Create isolated session with different configuration

225

val isolatedSpark = primarySpark.newSession()

226

isolatedSpark.conf.set("spark.sql.shuffle.partitions", "100")

227

228

// Both sessions share the same SparkContext but have independent configurations

229

println(s"Primary partitions: ${primarySpark.conf.get('spark.sql.shuffle.partitions')}")

230

println(s"Isolated partitions: ${isolatedSpark.conf.get('spark.sql.shuffle.partitions')}")

231

232

// Clean shutdown

233

isolatedSpark.close()

234

primarySpark.stop()

235

```

236

237

### Configuration Best Practices

238

```scala

239

import org.apache.spark.sql.SparkSession

240

241

// Production-ready configuration

242

val spark = SparkSession.builder()

243

.appName("Production Data Pipeline")

244

// Resource allocation

245

.config("spark.executor.memory", "4g")

246

.config("spark.executor.cores", "2")

247

.config("spark.executor.instances", "10")

248

249

// SQL optimizations

250

.config("spark.sql.adaptive.enabled", "true")

251

.config("spark.sql.adaptive.coalescePartitions.enabled", "true")

252

.config("spark.sql.adaptive.skewJoin.enabled", "true")

253

254

// Serialization

255

.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

256

.config("spark.sql.execution.arrow.pyspark.enabled", "true")

257

258

// Caching and checkpointing

259

.config("spark.sql.cache.serializer", "org.apache.spark.serializer.KryoSerializer")

260

.config("spark.checkpoint.compress", "true")

261

262

// Monitoring

263

.config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000")

264

.config("spark.sql.sources.parallelPartitionDiscovery.threshold", "32")

265

266

.getOrCreate()

267

268

// Set runtime configurations for specific workloads

269

if (isLargeDataset) {

270

spark.conf.set("spark.sql.shuffle.partitions", "400")

271

} else {

272

spark.conf.set("spark.sql.shuffle.partitions", "200")

273

}

274

```